1# Copyright 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import dataclasses 16import glob 17from importlib import resources 18import logging 19import os 20from pathlib import Path 21import re 22import shutil 23import signal 24import stat 25import subprocess 26import sys 27import tempfile 28import textwrap 29import time 30import unittest 31 32EXII_RETURN_CODE = 0 33INTERRUPTED_RETURN_CODE = 130 34 35 36class RunToolWithLoggingTest(unittest.TestCase): 37 38 @classmethod 39 def setUpClass(cls): 40 super().setUpClass() 41 # Configure to print logging to stdout. 42 logging.basicConfig(filename=None, level=logging.DEBUG) 43 console = logging.StreamHandler(sys.stdout) 44 logging.getLogger("").addHandler(console) 45 46 def setUp(self): 47 super().setUp() 48 self.working_dir = tempfile.TemporaryDirectory() 49 # Run all the tests from working_dir which is our temp Android build top. 50 os.chdir(self.working_dir.name) 51 52 self.logging_script_path = self._import_executable("run_tool_with_logging") 53 54 def tearDown(self): 55 self.working_dir.cleanup() 56 super().tearDown() 57 58 def test_does_not_log_when_logger_var_empty(self): 59 test_tool = TestScript.create(self.working_dir) 60 61 self._run_script_and_wait(f""" 62 export ANDROID_TOOL_LOGGER="" 63 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 64 """) 65 66 test_tool.assert_called_once_with_args("arg1 arg2") 67 68 def test_does_not_log_with_logger_unset(self): 69 test_tool = TestScript.create(self.working_dir) 70 71 self._run_script_and_wait(f""" 72 unset ANDROID_TOOL_LOGGER 73 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 74 """) 75 76 test_tool.assert_called_once_with_args("arg1 arg2") 77 78 def test_log_success_with_logger_enabled(self): 79 test_tool = TestScript.create(self.working_dir) 80 test_logger = TestScript.create(self.working_dir) 81 82 self._run_script_and_wait(f""" 83 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 84 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 85 """) 86 87 test_tool.assert_called_once_with_args("arg1 arg2") 88 expected_logger_args = ( 89 "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp=" 90 "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=0" 91 ) 92 test_logger.assert_called_once_with_args(expected_logger_args) 93 94 def test_run_tool_output_is_same_with_and_without_logging(self): 95 test_tool = TestScript.create(self.working_dir, "echo 'tool called'") 96 test_logger = TestScript.create(self.working_dir) 97 98 run_tool_with_logging_stdout, run_tool_with_logging_stderr = ( 99 self._run_script_and_wait(f""" 100 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 101 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 102 """) 103 ) 104 105 run_tool_without_logging_stdout, run_tool_without_logging_stderr = ( 106 self._run_script_and_wait(f""" 107 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 108 {test_tool.executable} arg1 arg2 109 """) 110 ) 111 112 self.assertEqual( 113 run_tool_with_logging_stdout, run_tool_without_logging_stdout 114 ) 115 self.assertEqual( 116 run_tool_with_logging_stderr, run_tool_without_logging_stderr 117 ) 118 119 def test_logger_output_is_suppressed(self): 120 test_tool = TestScript.create(self.working_dir) 121 test_logger = TestScript.create(self.working_dir, "echo 'logger called'") 122 123 run_tool_with_logging_output, _ = self._run_script_and_wait(f""" 124 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 125 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 126 """) 127 128 self.assertNotIn("logger called", run_tool_with_logging_output) 129 130 def test_logger_error_is_suppressed(self): 131 test_tool = TestScript.create(self.working_dir) 132 test_logger = TestScript.create( 133 self.working_dir, "echo 'logger failed' > /dev/stderr; exit 1" 134 ) 135 136 _, err = self._run_script_and_wait(f""" 137 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 138 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 139 """) 140 141 self.assertNotIn("logger failed", err) 142 143 def test_log_success_when_tool_interrupted(self): 144 test_tool = TestScript.create(self.working_dir, script_body="sleep 100") 145 test_logger = TestScript.create(self.working_dir) 146 147 process = self._run_script(f""" 148 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 149 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 150 """) 151 152 pgid = os.getpgid(process.pid) 153 # Give sometime for the subprocess to start. 154 time.sleep(1) 155 # Kill the subprocess and any processes created in the same group. 156 os.killpg(pgid, signal.SIGINT) 157 158 returncode, _, _ = self._wait_for_process(process) 159 self.assertEqual(returncode, INTERRUPTED_RETURN_CODE) 160 161 expected_logger_args = ( 162 "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp=" 163 "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=130" 164 ) 165 test_logger.assert_called_once_with_args(expected_logger_args) 166 167 def test_logger_can_be_toggled_on(self): 168 test_tool = TestScript.create(self.working_dir) 169 test_logger = TestScript.create(self.working_dir) 170 171 self._run_script_and_wait(f""" 172 export ANDROID_TOOL_LOGGER="" 173 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 174 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 175 """) 176 177 test_logger.assert_called_with_times(1) 178 179 def test_logger_can_be_toggled_off(self): 180 test_tool = TestScript.create(self.working_dir) 181 test_logger = TestScript.create(self.working_dir) 182 183 self._run_script_and_wait(f""" 184 export ANDROID_TOOL_LOGGER="{test_logger.executable}" 185 export ANDROID_TOOL_LOGGER="" 186 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 187 """) 188 189 test_logger.assert_not_called() 190 191 def test_integration_tool_event_logger_dry_run(self): 192 test_tool = TestScript.create(self.working_dir) 193 logger_path = self._import_executable("tool_event_logger") 194 195 self._run_script_and_wait(f""" 196 TMPDIR="{self.working_dir.name}" 197 export ANDROID_TOOL_LOGGER="{logger_path}" 198 export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run" 199 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2 200 """) 201 202 self._assert_logger_dry_run() 203 204 def test_tool_args_do_not_fail_logger(self): 205 test_tool = TestScript.create(self.working_dir) 206 logger_path = self._import_executable("tool_event_logger") 207 208 self._run_script_and_wait(f""" 209 TMPDIR="{self.working_dir.name}" 210 export ANDROID_TOOL_LOGGER="{logger_path}" 211 export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run" 212 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} --tool-arg1 213 """) 214 215 self._assert_logger_dry_run() 216 217 def _import_executable(self, executable_name: str) -> Path: 218 # logger = "tool_event_logger" 219 executable_path = Path(self.working_dir.name).joinpath(executable_name) 220 with resources.as_file( 221 resources.files("testdata").joinpath(executable_name) 222 ) as p: 223 shutil.copy(p, executable_path) 224 Path.chmod(executable_path, 0o755) 225 return executable_path 226 227 def _assert_logger_dry_run(self): 228 log_files = glob.glob(self.working_dir.name + "/tool_event_logger_*/*.log") 229 self.assertEqual(len(log_files), 1) 230 231 with open(log_files[0], "r") as f: 232 lines = f.readlines() 233 self.assertEqual(len(lines), 1) 234 self.assertIn("dry run", lines[0]) 235 236 def _run_script_and_wait(self, test_script: str) -> tuple[str, str]: 237 process = self._run_script(test_script) 238 returncode, out, err = self._wait_for_process(process) 239 logging.debug("script stdout: %s", out) 240 logging.debug("script stderr: %s", err) 241 self.assertEqual(returncode, EXII_RETURN_CODE) 242 return out, err 243 244 def _run_script(self, test_script: str) -> subprocess.Popen: 245 return subprocess.Popen( 246 test_script, 247 shell=True, 248 stdout=subprocess.PIPE, 249 stderr=subprocess.PIPE, 250 text=True, 251 start_new_session=True, 252 executable="/bin/bash", 253 ) 254 255 def _wait_for_process( 256 self, process: subprocess.Popen 257 ) -> tuple[int, str, str]: 258 pgid = os.getpgid(process.pid) 259 out, err = process.communicate() 260 # Wait for all process in the same group to complete since the logger runs 261 # as a separate detached process. 262 self._wait_for_process_group(pgid) 263 return (process.returncode, out, err) 264 265 def _wait_for_process_group(self, pgid: int, timeout: int = 5): 266 """Waits for all subprocesses within the process group to complete.""" 267 start_time = time.time() 268 while True: 269 if time.time() - start_time > timeout: 270 raise TimeoutError( 271 f"Process group did not complete after {timeout} seconds" 272 ) 273 for pid in os.listdir("/proc"): 274 if pid.isdigit(): 275 try: 276 if os.getpgid(int(pid)) == pgid: 277 time.sleep(0.1) 278 break 279 except (FileNotFoundError, PermissionError, ProcessLookupError): 280 pass 281 else: 282 # All processes have completed. 283 break 284 285 286@dataclasses.dataclass 287class TestScript: 288 executable: Path 289 output_file: Path 290 291 def create(temp_dir: Path, script_body: str = ""): 292 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f: 293 output_file = f.name 294 295 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f: 296 executable = f.name 297 executable_contents = textwrap.dedent(f""" 298 #!/bin/bash 299 300 echo "${{@}}" >> {output_file} 301 {script_body} 302 """) 303 f.write(executable_contents.encode("utf-8")) 304 305 Path.chmod(f.name, os.stat(f.name).st_mode | stat.S_IEXEC) 306 307 return TestScript(executable, output_file) 308 309 def assert_called_with_times(self, expected_call_times: int): 310 lines = self._read_contents_from_output_file() 311 assert len(lines) == expected_call_times, ( 312 f"Expect to call {expected_call_times} times, but actually called" 313 f" {len(lines)} times." 314 ) 315 316 def assert_called_with_args(self, expected_args: str): 317 lines = self._read_contents_from_output_file() 318 assert len(lines) > 0 319 assert re.search(expected_args, lines[0]), ( 320 f"Expect to call with args {expected_args}, but actually called with" 321 f" args {lines[0]}." 322 ) 323 324 def assert_not_called(self): 325 self.assert_called_with_times(0) 326 327 def assert_called_once_with_args(self, expected_args: str): 328 self.assert_called_with_times(1) 329 self.assert_called_with_args(expected_args) 330 331 def _read_contents_from_output_file(self) -> list[str]: 332 with open(self.output_file, "r") as f: 333 return f.readlines() 334 335 336if __name__ == "__main__": 337 unittest.main() 338