1#!/usr/bin/env python3
2#
3# Copyright 2023, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Base test module for Atest integration tests."""
18import concurrent.futures
19import json
20import logging
21import os
22import pathlib
23import re
24import shutil
25import subprocess
26import sys
27import time
28from typing import Any
29
30import split_build_test_script
31
32# Exporting for test modules' typing reference
33SplitBuildTestScript = split_build_test_script.SplitBuildTestScript
34StepInput = split_build_test_script.StepInput
35StepOutput = split_build_test_script.StepOutput
36run_in_parallel = split_build_test_script.ParallelTestRunner.run_in_parallel
37setup_parallel_in_build_env = (
38    split_build_test_script.ParallelTestRunner.setup_parallel_in_build_env
39)
40
41# Note: The following constants should ideally be imported from their
42#       corresponding prod source code, but this makes local execution of the
43#       integration test harder due to some special dependencies in the prod
44#       code. Therefore we copy the definition here for now in favor of easier
45#       local integration test execution. If value changes in the source code
46#       breaking the integration test becomes a problem in the future, we can
47#       reconsider importing these constants.
48# Printed before the html log line. Defined in atest/atest_utils.py.
49_HTML_LOG_PRINT_PREFIX = 'To access logs, press "ctrl" and click on'
50# Stdout print prefix for results directory. Defined in atest/atest_main.py
51_RESULTS_DIR_PRINT_PREFIX = 'Atest results and logs directory: '
52
53
54class LogEntry:
55  """Represents a single log entry."""
56
57  def __init__(
58      self,
59      timestamp_str,
60      src_file_name,
61      src_file_line_number,
62      log_level,
63      content_lines,
64  ):
65    """Initializes a LogEntry object from a logging line.
66
67    Args:
68        timestamp_str: The timestamp header string in each log entry.
69        src_file_name: The source file name in the log entry.
70        src_file_line_number: The source file line number in the log entry.
71        log_level: The log level string in the log entry.
72        content_lines: A list of log entry content lines.
73    """
74    self._timestamp_string = timestamp_str
75    self._source_file_name = src_file_name
76    self._source_file_line_number = src_file_line_number
77    self._log_level = log_level
78    self._content_lines = content_lines
79
80  def get_timestamp(self) -> float:
81    """Returns the timestamp of the log entry as an epoch time."""
82    return time.mktime(
83        time.strptime(self._timestamp_string, '%Y-%m-%d %H:%M:%S')
84    )
85
86  def get_timestamp_string(self) -> str:
87    """Returns the timestamp of the log entry as a string."""
88    return self._timestamp_string
89
90  def get_source_file_name(self) -> str:
91    """Returns the source file name of the log entry."""
92    return self._source_file_name
93
94  def get_source_file_line_number(self) -> int:
95    """Returns the source file line number of the log entry."""
96    return self._source_file_line_number
97
98  def get_log_level(self) -> str:
99    """Returns the log level of the log entry."""
100    return self._log_level
101
102  def get_content(self) -> str:
103    """Returns the content of the log entry."""
104    return '\n'.join(self._content_lines)
105
106
107class AtestRunResult:
108  """A class to store Atest run result and get detailed run information."""
109
110  def __init__(
111      self,
112      completed_process: subprocess.CompletedProcess[str],
113      env: dict[str, str],
114      repo_root: str,
115      config: split_build_test_script.IntegrationTestConfiguration,
116  ):
117    self._completed_process = completed_process
118    self._env = env
119    self._repo_root = repo_root
120    self._config = config
121
122  def get_returncode(self) -> int:
123    """Returns the return code of the completed process."""
124    return self._completed_process.returncode
125
126  def get_stdout(self) -> str:
127    """Returns the standard output of the completed process."""
128    return self._completed_process.stdout
129
130  def get_stderr(self) -> str:
131    """Returns the standard error of the completed process."""
132    return self._completed_process.stderr
133
134  def get_cmd_list(self) -> list[str]:
135    """Returns the command list used in the process run."""
136    return self._completed_process.args
137
138  def get_results_dir_path(self, snapshot_ready=False) -> pathlib.Path:
139    """Returns the atest results directory path.
140
141    Args:
142        snapshot_ready: Whether to make the result root directory snapshot
143          ready. When set to True and called in build environment, this method
144          will copy the path into <repo_root>/out with dereferencing so that the
145          directory can be safely added to snapshot.
146
147    Raises:
148        RuntimeError: Failed to parse the result dir path.
149
150    Returns:
151        The Atest result directory path.
152    """
153    results_dir = None
154    for line in self.get_stdout().splitlines(keepends=False):
155      if line.startswith(_RESULTS_DIR_PRINT_PREFIX):
156        results_dir = pathlib.Path(line[len(_RESULTS_DIR_PRINT_PREFIX) :])
157    if not results_dir:
158      raise RuntimeError('Failed to parse the result directory from stdout.')
159
160    if self._config.is_test_env or not snapshot_ready:
161      return results_dir
162
163    result_dir_copy_path = pathlib.Path(self._env['OUT_DIR']).joinpath(
164        'atest_integration_tests', results_dir.name
165    )
166    if not result_dir_copy_path.exists():
167      shutil.copytree(results_dir, result_dir_copy_path, symlinks=False)
168
169    return result_dir_copy_path
170
171  def get_test_result_dict(self) -> dict[str, Any]:
172    """Gets the atest results loaded from the test_result json.
173
174    Returns:
175        Atest result information loaded from the test_result json file. The test
176        result usually contains information about test runners and test
177        pass/fail results.
178    """
179    json_path = self.get_results_dir_path() / 'test_result'
180    with open(json_path, 'r', encoding='utf-8') as f:
181      return json.load(f)
182
183  def get_passed_count(self) -> int:
184    """Gets the total number of passed tests from atest summary."""
185    return self.get_test_result_dict()['total_summary']['PASSED']
186
187  def get_failed_count(self) -> int:
188    """Gets the total number of failed tests from atest summary."""
189    return self.get_test_result_dict()['total_summary']['FAILED']
190
191  def get_ignored_count(self) -> int:
192    """Gets the total number of ignored tests from atest summary."""
193    return self.get_test_result_dict()['total_summary']['IGNORED']
194
195  def get_atest_log(self) -> str:
196    """Gets the log content read from the atest log file."""
197    log_path = self.get_results_dir_path() / 'atest.log'
198    return log_path.read_text(encoding='utf-8')
199
200  def get_atest_log_entries(self) -> list[LogEntry]:
201    """Gets the parsed atest log entries list from the atest log file.
202
203    This method parse the atest log file and construct a new entry when a line
204    starts with a time string, source file name, line number, and log level.
205
206    Returns:
207      A list of parsed log entries.
208    """
209    entries = []
210    last_content_lines = []
211    for line in self.get_atest_log().splitlines():
212      regex = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+?):(\d+):(\w+): (.*)'
213      match = re.match(regex, line)
214      if match:
215        last_content_lines = [match.group(5)]
216        entries.append(
217            LogEntry(
218                match.group(1),
219                match.group(2),
220                int(match.group(3)),
221                match.group(4),
222                last_content_lines,
223            )
224        )
225      else:
226        if last_content_lines:
227          last_content_lines.append(line)
228
229    return entries
230
231  def get_atest_log_values_from_prefix(self, prefix: str) -> list[str]:
232    """Gets log values from lines starting with the given log prefix."""
233    res = []
234    for entry in self.get_atest_log_entries():
235      content = entry.get_content()
236      if content.startswith(prefix):
237        res.append(content[len(prefix) :])
238    return res
239
240  def check_returncode(self) -> None:
241    """Checks the return code and raises an exception if non-zero."""
242
243    def add_line_prefix(msg: str):
244      return (
245          ''.join(('> %s' % line for line in msg.splitlines(keepends=True)))
246          if msg
247          else msg
248      )
249
250    stderr = (
251        f'stderr:\n{add_line_prefix(self.get_stderr())}\n'
252        if self.get_stderr() and self.get_stderr().strip()
253        else ''
254    )
255
256    if self.get_returncode() != 0:
257      raise RuntimeError(
258          f'Atest command {self.get_cmd_list()} finished with exit code'
259          f' {self.get_returncode()}.\n'
260          f'stdout:\n{add_line_prefix(self.get_stdout())}\n{stderr}'
261      )
262
263  def get_local_reproduce_debug_cmd(self) -> str:
264    """Returns a full reproduce command for local debugging purpose.
265
266    Returns:
267        A command that can be executed directly in command line to
268        reproduce the atest command.
269    """
270    return '(cd {dir} && {env} {cmd})'.format(
271        dir=self._repo_root,
272        env=' '.join((k + '=' + v for k, v in self._env.items())),
273        cmd=' '.join(self.get_cmd_list()),
274    )
275
276
277class AtestTestCase(split_build_test_script.SplitBuildTestTestCase):
278  """Base test case for build-test environment split integration tests."""
279
280  def setUp(self):
281    super().setUp()
282    # Default include list of repo paths for snapshot
283    self._default_snapshot_include_paths = [
284        '$OUT_DIR/host/linux-x86',
285        '$OUT_DIR/target/product/*/module-info*',
286        '$OUT_DIR/target/product/*/testcases',
287        '$OUT_DIR/target/product/*/data',
288        '$OUT_DIR/target/product/*/all_modules.txt',
289        '$OUT_DIR/soong/module_bp*',
290        'tools/asuite/atest/test_runners/roboleaf_launched.txt',
291        '.repo/manifest.xml',
292        'build/soong/soong_ui.bash',
293        'prebuilts/build-tools/path/linux-x86/python3',
294        'prebuilts/build-tools/linux-x86/bin/py3-cmd',
295        'prebuilts/build-tools',
296        'prebuilts/asuite/atest/linux-x86',
297    ]
298
299    # Default exclude list of repo paths for snapshot
300    self._default_snapshot_exclude_paths = [
301        '$OUT_DIR/host/linux-x86/bin/go',
302        '$OUT_DIR/host/linux-x86/bin/soong_build',
303        '$OUT_DIR/host/linux-x86/obj',
304    ]
305
306    # Default list of environment variables to take and restore in snapshots
307    self._default_snapshot_env_keys = [
308        split_build_test_script.ANDROID_BUILD_TOP_KEY,
309        'ANDROID_HOST_OUT',
310        'ANDROID_PRODUCT_OUT',
311        'ANDROID_HOST_OUT_TESTCASES',
312        'ANDROID_TARGET_OUT_TESTCASES',
313        'OUT',
314        'OUT_DIR',
315        'PATH',
316        'HOST_OUT_TESTCASES',
317        'ANDROID_JAVA_HOME',
318        'JAVA_HOME',
319    ]
320
321  def create_atest_script(self, name: str = None) -> SplitBuildTestScript:
322    """Create an instance of atest integration test utility."""
323    return self.create_split_build_test_script(name)
324
325  def create_step_output(self) -> StepOutput:
326    """Create a step output object with default values."""
327    out = StepOutput()
328    out.add_snapshot_include_paths(self._default_snapshot_include_paths)
329    out.add_snapshot_exclude_paths(self._default_snapshot_exclude_paths)
330    out.add_snapshot_env_keys(self._default_snapshot_env_keys)
331    out.add_snapshot_include_paths(self._get_jdk_path_list())
332    return out
333
334  @classmethod
335  def run_atest_command(
336      cls,
337      cmd: str,
338      step_in: split_build_test_script.StepInput,
339      include_device_serial: bool,
340      print_output: bool = True,
341      use_prebuilt_atest_binary=None,
342  ) -> AtestRunResult:
343    """Run either `atest-dev` or `atest` command through subprocess.
344
345    Args:
346        cmd: command string for Atest. Do not add 'atest-dev' or 'atest' in the
347          beginning of the command.
348        step_in: The step input object from build or test step.
349        include_device_serial: Whether a device is required for the atest
350          command. This argument is only used to determine whether to include
351          device serial in the command. It does not add device/deviceless
352          arguments such as '--host'.
353        print_output: Whether to print the stdout and stderr while the command
354          is running.
355        use_prebuilt_atest_binary: Whether to run the command using the prebuilt
356          atest binary instead of the atest-dev binary.
357
358    Returns:
359        An AtestRunResult object containing the run information.
360    """
361    if use_prebuilt_atest_binary is None:
362      use_prebuilt_atest_binary = step_in.get_config().use_prebuilt_atest_binary
363    atest_binary = 'atest' if use_prebuilt_atest_binary else 'atest-dev'
364
365    # TODO: b/336839543 - Throw error here when serial is required but not set
366    # instead of from step_in.get_device_serial_args_or_empty()
367    serial_arg = (
368        step_in.get_device_serial_args_or_empty()
369        if include_device_serial
370        else ''
371    )
372    complete_cmd = f'{atest_binary}{serial_arg} {cmd}'
373
374    indentation = '  '
375    logging.debug('Executing atest command: %s', complete_cmd)
376    logging.debug(
377        '%sCommand environment variables: %s', indentation, step_in.get_env()
378    )
379    result = AtestRunResult(
380        cls._run_shell_command(
381            complete_cmd.split(),
382            env=step_in.get_env(),
383            cwd=step_in.get_repo_root(),
384            print_output=print_output,
385        ),
386        step_in.get_env(),
387        step_in.get_repo_root(),
388        step_in.get_config(),
389    )
390
391    wrap_output_lines = lambda output_str: ''.join((
392        f'{indentation * 2}> %s' % line for line in output_str.splitlines(True)
393    ))
394    logging.debug(
395        '%sCommand stdout:\n%s',
396        indentation,
397        wrap_output_lines(result.get_stdout()),
398    )
399    logging.debug(
400        '%sAtest log:\n%s',
401        indentation,
402        wrap_output_lines(result.get_atest_log()),
403    )
404
405    return result
406
407  @staticmethod
408  def _run_shell_command(
409      cmd: list[str],
410      env: dict[str, str],
411      cwd: str,
412      print_output: bool = True,
413  ) -> subprocess.CompletedProcess[str]:
414    """Execute shell command with real time output printing and capture."""
415
416    def read_output(read_src, print_dst, capture_dst):
417      while (output := read_src.readline()) or process.poll() is None:
418        if output:
419          if print_output:
420            print(output, end='', file=print_dst)
421          capture_dst.append(output)
422
423    with subprocess.Popen(
424        cmd,
425        stdout=subprocess.PIPE,
426        stderr=subprocess.PIPE,
427        text=True,
428        env=env,
429        cwd=cwd,
430    ) as process:
431      stdout = []
432      stderr = []
433      with concurrent.futures.ThreadPoolExecutor() as executor:
434        stdout_future = executor.submit(
435            read_output, process.stdout, sys.stdout, stdout
436        )
437        stderr_future = executor.submit(
438            read_output, process.stderr, sys.stderr, stderr
439        )
440      stdout_future.result()
441      stderr_future.result()
442
443      return subprocess.CompletedProcess(
444          cmd, process.poll(), ''.join(stdout), ''.join(stderr)
445      )
446
447  @staticmethod
448  def _get_jdk_path_list() -> str:
449    """Get the relative jdk directory in build environment."""
450    if split_build_test_script.ANDROID_BUILD_TOP_KEY not in os.environ:
451      return []
452    absolute_path = pathlib.Path(os.environ['ANDROID_JAVA_HOME'])
453    while not absolute_path.name.startswith('jdk'):
454      absolute_path = absolute_path.parent
455    if not absolute_path.name.startswith('jdk'):
456      raise ValueError(
457          'Unrecognized jdk directory ' + os.environ['ANDROID_JAVA_HOME']
458      )
459    repo_root = pathlib.Path(
460        os.environ[split_build_test_script.ANDROID_BUILD_TOP_KEY]
461    )
462    return [absolute_path.relative_to(repo_root).as_posix()]
463
464
465def main():
466  """Main method to run the integration tests."""
467
468  def argparser_update_func(parser):
469    parser.add_argument(
470        '--use-prebuilt-atest-binary',
471        action='store_true',
472        default=False,
473        help=(
474            'Set the default atest binary to the prebuilt `atest` instead'
475            ' of `atest-dev`.'
476        ),
477    )
478
479  def config_update_function(config, args):
480    config.use_prebuilt_atest_binary = args.use_prebuilt_atest_binary
481
482  split_build_test_script.main(
483      argv=sys.argv,
484      make_before_build=['atest'],
485      argparser_update_func=argparser_update_func,
486      config_update_function=config_update_function,
487  )
488