1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses
16import glob
17from importlib import resources
18import logging
19import os
20from pathlib import Path
21import re
22import shutil
23import signal
24import stat
25import subprocess
26import sys
27import tempfile
28import textwrap
29import time
30import unittest
31
32EXII_RETURN_CODE = 0
33INTERRUPTED_RETURN_CODE = 130
34
35
36class RunToolWithLoggingTest(unittest.TestCase):
37
38  @classmethod
39  def setUpClass(cls):
40    super().setUpClass()
41    # Configure to print logging to stdout.
42    logging.basicConfig(filename=None, level=logging.DEBUG)
43    console = logging.StreamHandler(sys.stdout)
44    logging.getLogger("").addHandler(console)
45
46  def setUp(self):
47    super().setUp()
48    self.working_dir = tempfile.TemporaryDirectory()
49    # Run all the tests from working_dir which is our temp Android build top.
50    os.chdir(self.working_dir.name)
51
52    self.logging_script_path = self._import_executable("run_tool_with_logging")
53
54  def tearDown(self):
55    self.working_dir.cleanup()
56    super().tearDown()
57
58  def test_does_not_log_when_logger_var_empty(self):
59    test_tool = TestScript.create(self.working_dir)
60
61    self._run_script_and_wait(f"""
62      export ANDROID_TOOL_LOGGER=""
63      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
64    """)
65
66    test_tool.assert_called_once_with_args("arg1 arg2")
67
68  def test_does_not_log_with_logger_unset(self):
69    test_tool = TestScript.create(self.working_dir)
70
71    self._run_script_and_wait(f"""
72      unset ANDROID_TOOL_LOGGER
73      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
74    """)
75
76    test_tool.assert_called_once_with_args("arg1 arg2")
77
78  def test_log_success_with_logger_enabled(self):
79    test_tool = TestScript.create(self.working_dir)
80    test_logger = TestScript.create(self.working_dir)
81
82    self._run_script_and_wait(f"""
83      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
84      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
85    """)
86
87    test_tool.assert_called_once_with_args("arg1 arg2")
88    expected_logger_args = (
89        "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp="
90        "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=0"
91    )
92    test_logger.assert_called_once_with_args(expected_logger_args)
93
94  def test_run_tool_output_is_same_with_and_without_logging(self):
95    test_tool = TestScript.create(self.working_dir, "echo 'tool called'")
96    test_logger = TestScript.create(self.working_dir)
97
98    run_tool_with_logging_stdout, run_tool_with_logging_stderr = (
99        self._run_script_and_wait(f"""
100      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
101      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
102    """)
103    )
104
105    run_tool_without_logging_stdout, run_tool_without_logging_stderr = (
106        self._run_script_and_wait(f"""
107      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
108      {test_tool.executable} arg1 arg2
109    """)
110    )
111
112    self.assertEqual(
113        run_tool_with_logging_stdout, run_tool_without_logging_stdout
114    )
115    self.assertEqual(
116        run_tool_with_logging_stderr, run_tool_without_logging_stderr
117    )
118
119  def test_logger_output_is_suppressed(self):
120    test_tool = TestScript.create(self.working_dir)
121    test_logger = TestScript.create(self.working_dir, "echo 'logger called'")
122
123    run_tool_with_logging_output, _ = self._run_script_and_wait(f"""
124      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
125      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
126    """)
127
128    self.assertNotIn("logger called", run_tool_with_logging_output)
129
130  def test_logger_error_is_suppressed(self):
131    test_tool = TestScript.create(self.working_dir)
132    test_logger = TestScript.create(
133        self.working_dir, "echo 'logger failed' > /dev/stderr; exit 1"
134    )
135
136    _, err = self._run_script_and_wait(f"""
137      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
138      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
139    """)
140
141    self.assertNotIn("logger failed", err)
142
143  def test_log_success_when_tool_interrupted(self):
144    test_tool = TestScript.create(self.working_dir, script_body="sleep 100")
145    test_logger = TestScript.create(self.working_dir)
146
147    process = self._run_script(f"""
148      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
149      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
150    """)
151
152    pgid = os.getpgid(process.pid)
153    # Give sometime for the subprocess to start.
154    time.sleep(1)
155    # Kill the subprocess and any processes created in the same group.
156    os.killpg(pgid, signal.SIGINT)
157
158    returncode, _, _ = self._wait_for_process(process)
159    self.assertEqual(returncode, INTERRUPTED_RETURN_CODE)
160
161    expected_logger_args = (
162        "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp="
163        "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=130"
164    )
165    test_logger.assert_called_once_with_args(expected_logger_args)
166
167  def test_logger_can_be_toggled_on(self):
168    test_tool = TestScript.create(self.working_dir)
169    test_logger = TestScript.create(self.working_dir)
170
171    self._run_script_and_wait(f"""
172      export ANDROID_TOOL_LOGGER=""
173      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
174      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
175    """)
176
177    test_logger.assert_called_with_times(1)
178
179  def test_logger_can_be_toggled_off(self):
180    test_tool = TestScript.create(self.working_dir)
181    test_logger = TestScript.create(self.working_dir)
182
183    self._run_script_and_wait(f"""
184      export ANDROID_TOOL_LOGGER="{test_logger.executable}"
185      export ANDROID_TOOL_LOGGER=""
186      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
187    """)
188
189    test_logger.assert_not_called()
190
191  def test_integration_tool_event_logger_dry_run(self):
192    test_tool = TestScript.create(self.working_dir)
193    logger_path = self._import_executable("tool_event_logger")
194
195    self._run_script_and_wait(f"""
196      TMPDIR="{self.working_dir.name}"
197      export ANDROID_TOOL_LOGGER="{logger_path}"
198      export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run"
199      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
200    """)
201
202    self._assert_logger_dry_run()
203
204  def test_tool_args_do_not_fail_logger(self):
205    test_tool = TestScript.create(self.working_dir)
206    logger_path = self._import_executable("tool_event_logger")
207
208    self._run_script_and_wait(f"""
209      TMPDIR="{self.working_dir.name}"
210      export ANDROID_TOOL_LOGGER="{logger_path}"
211      export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run"
212      {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} --tool-arg1
213    """)
214
215    self._assert_logger_dry_run()
216
217  def _import_executable(self, executable_name: str) -> Path:
218    # logger = "tool_event_logger"
219    executable_path = Path(self.working_dir.name).joinpath(executable_name)
220    with resources.as_file(
221        resources.files("testdata").joinpath(executable_name)
222    ) as p:
223      shutil.copy(p, executable_path)
224    Path.chmod(executable_path, 0o755)
225    return executable_path
226
227  def _assert_logger_dry_run(self):
228    log_files = glob.glob(self.working_dir.name + "/tool_event_logger_*/*.log")
229    self.assertEqual(len(log_files), 1)
230
231    with open(log_files[0], "r") as f:
232      lines = f.readlines()
233      self.assertEqual(len(lines), 1)
234      self.assertIn("dry run", lines[0])
235
236  def _run_script_and_wait(self, test_script: str) -> tuple[str, str]:
237    process = self._run_script(test_script)
238    returncode, out, err = self._wait_for_process(process)
239    logging.debug("script stdout: %s", out)
240    logging.debug("script stderr: %s", err)
241    self.assertEqual(returncode, EXII_RETURN_CODE)
242    return out, err
243
244  def _run_script(self, test_script: str) -> subprocess.Popen:
245    return subprocess.Popen(
246        test_script,
247        shell=True,
248        stdout=subprocess.PIPE,
249        stderr=subprocess.PIPE,
250        text=True,
251        start_new_session=True,
252        executable="/bin/bash",
253    )
254
255  def _wait_for_process(
256      self, process: subprocess.Popen
257  ) -> tuple[int, str, str]:
258    pgid = os.getpgid(process.pid)
259    out, err = process.communicate()
260    # Wait for all process in the same group to complete since the logger runs
261    # as a separate detached process.
262    self._wait_for_process_group(pgid)
263    return (process.returncode, out, err)
264
265  def _wait_for_process_group(self, pgid: int, timeout: int = 5):
266    """Waits for all subprocesses within the process group to complete."""
267    start_time = time.time()
268    while True:
269      if time.time() - start_time > timeout:
270        raise TimeoutError(
271            f"Process group did not complete after {timeout} seconds"
272        )
273      for pid in os.listdir("/proc"):
274        if pid.isdigit():
275          try:
276            if os.getpgid(int(pid)) == pgid:
277              time.sleep(0.1)
278              break
279          except (FileNotFoundError, PermissionError, ProcessLookupError):
280            pass
281      else:
282        # All processes have completed.
283        break
284
285
286@dataclasses.dataclass
287class TestScript:
288  executable: Path
289  output_file: Path
290
291  def create(temp_dir: Path, script_body: str = ""):
292    with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
293      output_file = f.name
294
295    with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
296      executable = f.name
297      executable_contents = textwrap.dedent(f"""
298      #!/bin/bash
299
300      echo "${{@}}" >> {output_file}
301      {script_body}
302      """)
303      f.write(executable_contents.encode("utf-8"))
304
305    Path.chmod(f.name, os.stat(f.name).st_mode | stat.S_IEXEC)
306
307    return TestScript(executable, output_file)
308
309  def assert_called_with_times(self, expected_call_times: int):
310    lines = self._read_contents_from_output_file()
311    assert len(lines) == expected_call_times, (
312        f"Expect to call {expected_call_times} times, but actually called"
313        f" {len(lines)} times."
314    )
315
316  def assert_called_with_args(self, expected_args: str):
317    lines = self._read_contents_from_output_file()
318    assert len(lines) > 0
319    assert re.search(expected_args, lines[0]), (
320        f"Expect to call with args {expected_args}, but actually called with"
321        f" args {lines[0]}."
322    )
323
324  def assert_not_called(self):
325    self.assert_called_with_times(0)
326
327  def assert_called_once_with_args(self, expected_args: str):
328    self.assert_called_with_times(1)
329    self.assert_called_with_args(expected_args)
330
331  def _read_contents_from_output_file(self) -> list[str]:
332    with open(self.output_file, "r") as f:
333      return f.readlines()
334
335
336if __name__ == "__main__":
337  unittest.main()
338