1#!/usr/bin/env python3
3# Copyright 2023, The Android Open Source Project
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9#     http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17"""Base test module for Atest integration tests."""
18import concurrent.futures
19import json
20import logging
21import os
22import pathlib
23import re
24import shutil
25import subprocess
26import sys
27import time
28from typing import Any
30import split_build_test_script
32# Exporting for test modules' typing reference
33SplitBuildTestScript = split_build_test_script.SplitBuildTestScript
34StepInput = split_build_test_script.StepInput
35StepOutput = split_build_test_script.StepOutput
36run_in_parallel = split_build_test_script.ParallelTestRunner.run_in_parallel
37setup_parallel_in_build_env = (
38    split_build_test_script.ParallelTestRunner.setup_parallel_in_build_env
41# Note: The following constants should ideally be imported from their
42#       corresponding prod source code, but this makes local execution of the
43#       integration test harder due to some special dependencies in the prod
44#       code. Therefore we copy the definition here for now in favor of easier
45#       local integration test execution. If value changes in the source code
46#       breaking the integration test becomes a problem in the future, we can
47#       reconsider importing these constants.
48# Printed before the html log line. Defined in atest/atest_utils.py.
49_HTML_LOG_PRINT_PREFIX = 'To access logs, press "ctrl" and click on'
50# Stdout print prefix for results directory. Defined in atest/atest_main.py
51_RESULTS_DIR_PRINT_PREFIX = 'Atest results and logs directory: '
54class LogEntry:
55  """Represents a single log entry."""
57  def __init__(
58      self,
59      timestamp_str,
60      src_file_name,
61      src_file_line_number,
62      log_level,
63      content_lines,
64  ):
65    """Initializes a LogEntry object from a logging line.
67    Args:
68        timestamp_str: The timestamp header string in each log entry.
69        src_file_name: The source file name in the log entry.
70        src_file_line_number: The source file line number in the log entry.
71        log_level: The log level string in the log entry.
72        content_lines: A list of log entry content lines.
73    """
74    self._timestamp_string = timestamp_str
75    self._source_file_name = src_file_name
76    self._source_file_line_number = src_file_line_number
77    self._log_level = log_level
78    self._content_lines = content_lines
80  def get_timestamp(self) -> float:
81    """Returns the timestamp of the log entry as an epoch time."""
82    return time.mktime(
83        time.strptime(self._timestamp_string, '%Y-%m-%d %H:%M:%S')
84    )
86  def get_timestamp_string(self) -> str:
87    """Returns the timestamp of the log entry as a string."""
88    return self._timestamp_string
90  def get_source_file_name(self) -> str:
91    """Returns the source file name of the log entry."""
92    return self._source_file_name
94  def get_source_file_line_number(self) -> int:
95    """Returns the source file line number of the log entry."""
96    return self._source_file_line_number
98  def get_log_level(self) -> str:
99    """Returns the log level of the log entry."""
100    return self._log_level
102  def get_content(self) -> str:
103    """Returns the content of the log entry."""
104    return '\n'.join(self._content_lines)
107class AtestRunResult:
108  """A class to store Atest run result and get detailed run information."""
110  def __init__(
111      self,
112      completed_process: subprocess.CompletedProcess[str],
113      env: dict[str, str],
114      repo_root: str,
115      config: split_build_test_script.IntegrationTestConfiguration,
116  ):
117    self._completed_process = completed_process
118    self._env = env
119    self._repo_root = repo_root
120    self._config = config
122  def get_returncode(self) -> int:
123    """Returns the return code of the completed process."""
124    return self._completed_process.returncode
126  def get_stdout(self) -> str:
127    """Returns the standard output of the completed process."""
128    return self._completed_process.stdout
130  def get_stderr(self) -> str:
131    """Returns the standard error of the completed process."""
132    return self._completed_process.stderr
134  def get_cmd_list(self) -> list[str]:
135    """Returns the command list used in the process run."""
136    return self._completed_process.args
138  def get_results_dir_path(self, snapshot_ready=False) -> pathlib.Path:
139    """Returns the atest results directory path.
141    Args:
142        snapshot_ready: Whether to make the result root directory snapshot
143          ready. When set to True and called in build environment, this method
144          will copy the path into <repo_root>/out with dereferencing so that the
145          directory can be safely added to snapshot.
147    Raises:
148        RuntimeError: Failed to parse the result dir path.
150    Returns:
151        The Atest result directory path.
152    """
153    results_dir = None
154    for line in self.get_stdout().splitlines(keepends=False):
155      if line.startswith(_RESULTS_DIR_PRINT_PREFIX):
156        results_dir = pathlib.Path(line[len(_RESULTS_DIR_PRINT_PREFIX) :])
157    if not results_dir:
158      raise RuntimeError('Failed to parse the result directory from stdout.')
160    if self._config.is_test_env or not snapshot_ready:
161      return results_dir
163    result_dir_copy_path = pathlib.Path(self._env['OUT_DIR']).joinpath(
164        'atest_integration_tests', results_dir.name
165    )
166    if not result_dir_copy_path.exists():
167      shutil.copytree(results_dir, result_dir_copy_path, symlinks=False)
169    return result_dir_copy_path
171  def get_test_result_dict(self) -> dict[str, Any]:
172    """Gets the atest results loaded from the test_result json.
174    Returns:
175        Atest result information loaded from the test_result json file. The test
176        result usually contains information about test runners and test
177        pass/fail results.
178    """
179    json_path = self.get_results_dir_path() / 'test_result'
180    with open(json_path, 'r', encoding='utf-8') as f:
181      return json.load(f)
183  def get_passed_count(self) -> int:
184    """Gets the total number of passed tests from atest summary."""
185    return self.get_test_result_dict()['total_summary']['PASSED']
187  def get_failed_count(self) -> int:
188    """Gets the total number of failed tests from atest summary."""
189    return self.get_test_result_dict()['total_summary']['FAILED']
191  def get_ignored_count(self) -> int:
192    """Gets the total number of ignored tests from atest summary."""
193    return self.get_test_result_dict()['total_summary']['IGNORED']
195  def get_atest_log(self) -> str:
196    """Gets the log content read from the atest log file."""
197    log_path = self.get_results_dir_path() / 'atest.log'
198    return log_path.read_text(encoding='utf-8')
200  def get_atest_log_entries(self) -> list[LogEntry]:
201    """Gets the parsed atest log entries list from the atest log file.
203    This method parse the atest log file and construct a new entry when a line
204    starts with a time string, source file name, line number, and log level.
206    Returns:
207      A list of parsed log entries.
208    """
209    entries = []
210    last_content_lines = []
211    for line in self.get_atest_log().splitlines():
212      regex = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+?):(\d+):(\w+): (.*)'
213      match = re.match(regex, line)
214      if match:
215        last_content_lines = [match.group(5)]
216        entries.append(
217            LogEntry(
218                match.group(1),
219                match.group(2),
220                int(match.group(3)),
221                match.group(4),
222                last_content_lines,
223            )
224        )
225      else:
226        if last_content_lines:
227          last_content_lines.append(line)
229    return entries
231  def get_atest_log_values_from_prefix(self, prefix: str) -> list[str]:
232    """Gets log values from lines starting with the given log prefix."""
233    res = []
234    for entry in self.get_atest_log_entries():
235      content = entry.get_content()
236      if content.startswith(prefix):
237        res.append(content[len(prefix) :])
238    return res
240  def check_returncode(self) -> None:
241    """Checks the return code and raises an exception if non-zero."""
243    def add_line_prefix(msg: str):
244      return (
245          ''.join(('> %s' % line for line in msg.splitlines(keepends=True)))
246          if msg
247          else msg
248      )
250    stderr = (
251        f'stderr:\n{add_line_prefix(self.get_stderr())}\n'
252        if self.get_stderr() and self.get_stderr().strip()
253        else ''
254    )
256    if self.get_returncode() != 0:
257      raise RuntimeError(
258          f'Atest command {self.get_cmd_list()} finished with exit code'
259          f' {self.get_returncode()}.\n'
260          f'stdout:\n{add_line_prefix(self.get_stdout())}\n{stderr}'
261      )
263  def get_local_reproduce_debug_cmd(self) -> str:
264    """Returns a full reproduce command for local debugging purpose.
266    Returns:
267        A command that can be executed directly in command line to
268        reproduce the atest command.
269    """
270    return '(cd {dir} && {env} {cmd})'.format(
271        dir=self._repo_root,
272        env=' '.join((k + '=' + v for k, v in self._env.items())),
273        cmd=' '.join(self.get_cmd_list()),
274    )
277class AtestTestCase(split_build_test_script.SplitBuildTestTestCase):
278  """Base test case for build-test environment split integration tests."""
280  def setUp(self):
281    super().setUp()
282    # Default include list of repo paths for snapshot
283    self._default_snapshot_include_paths = [
284        '$OUT_DIR/host/linux-x86',
285        '$OUT_DIR/target/product/*/module-info*',
286        '$OUT_DIR/target/product/*/testcases',
287        '$OUT_DIR/target/product/*/data',
288        '$OUT_DIR/target/product/*/all_modules.txt',
289        '$OUT_DIR/soong/module_bp*',
290        'tools/asuite/atest/test_runners/roboleaf_launched.txt',
291        '.repo/manifest.xml',
292        'build/soong/soong_ui.bash',
293        'prebuilts/build-tools/path/linux-x86/python3',
294        'prebuilts/build-tools/linux-x86/bin/py3-cmd',
295        'prebuilts/build-tools',
296        'prebuilts/asuite/atest/linux-x86',
297    ]
299    # Default exclude list of repo paths for snapshot
300    self._default_snapshot_exclude_paths = [
301        '$OUT_DIR/host/linux-x86/bin/go',
302        '$OUT_DIR/host/linux-x86/bin/soong_build',
303        '$OUT_DIR/host/linux-x86/obj',
304    ]
306    # Default list of environment variables to take and restore in snapshots
307    self._default_snapshot_env_keys = [
308        split_build_test_script.ANDROID_BUILD_TOP_KEY,
309        'ANDROID_HOST_OUT',
313        'OUT',
314        'OUT_DIR',
315        'PATH',
316        'HOST_OUT_TESTCASES',
317        'ANDROID_JAVA_HOME',
318        'JAVA_HOME',
319    ]
321  def create_atest_script(self, name: str = None) -> SplitBuildTestScript:
322    """Create an instance of atest integration test utility."""
323    return self.create_split_build_test_script(name)
325  def create_step_output(self) -> StepOutput:
326    """Create a step output object with default values."""
327    out = StepOutput()
328    out.add_snapshot_include_paths(self._default_snapshot_include_paths)
329    out.add_snapshot_exclude_paths(self._default_snapshot_exclude_paths)
330    out.add_snapshot_env_keys(self._default_snapshot_env_keys)
331    out.add_snapshot_include_paths(self._get_jdk_path_list())
332    return out
334  @classmethod
335  def run_atest_command(
336      cls,
337      cmd: str,
338      step_in: split_build_test_script.StepInput,
339      include_device_serial: bool,
340      print_output: bool = True,
341      use_prebuilt_atest_binary=None,
342  ) -> AtestRunResult:
343    """Run either `atest-dev` or `atest` command through subprocess.
345    Args:
346        cmd: command string for Atest. Do not add 'atest-dev' or 'atest' in the
347          beginning of the command.
348        step_in: The step input object from build or test step.
349        include_device_serial: Whether a device is required for the atest
350          command. This argument is only used to determine whether to include
351          device serial in the command. It does not add device/deviceless
352          arguments such as '--host'.
353        print_output: Whether to print the stdout and stderr while the command
354          is running.
355        use_prebuilt_atest_binary: Whether to run the command using the prebuilt
356          atest binary instead of the atest-dev binary.
358    Returns:
359        An AtestRunResult object containing the run information.
360    """
361    if use_prebuilt_atest_binary is None:
362      use_prebuilt_atest_binary = step_in.get_config().use_prebuilt_atest_binary
363    atest_binary = 'atest' if use_prebuilt_atest_binary else 'atest-dev'
365    # TODO: b/336839543 - Throw error here when serial is required but not set
366    # instead of from step_in.get_device_serial_args_or_empty()
367    serial_arg = (
368        step_in.get_device_serial_args_or_empty()
369        if include_device_serial
370        else ''
371    )
372    complete_cmd = f'{atest_binary}{serial_arg} {cmd}'
374    indentation = '  '
375    logging.debug('Executing atest command: %s', complete_cmd)
376    logging.debug(
377        '%sCommand environment variables: %s', indentation, step_in.get_env()
378    )
379    result = AtestRunResult(
380        cls._run_shell_command(
381            complete_cmd.split(),
382            env=step_in.get_env(),
383            cwd=step_in.get_repo_root(),
384            print_output=print_output,
385        ),
386        step_in.get_env(),
387        step_in.get_repo_root(),
388        step_in.get_config(),
389    )
391    wrap_output_lines = lambda output_str: ''.join((
392        f'{indentation * 2}> %s' % line for line in output_str.splitlines(True)
393    ))
394    logging.debug(
395        '%sCommand stdout:\n%s',
396        indentation,
397        wrap_output_lines(result.get_stdout()),
398    )
399    logging.debug(
400        '%sAtest log:\n%s',
401        indentation,
402        wrap_output_lines(result.get_atest_log()),
403    )
405    return result
407  @staticmethod
408  def _run_shell_command(
409      cmd: list[str],
410      env: dict[str, str],
411      cwd: str,
412      print_output: bool = True,
413  ) -> subprocess.CompletedProcess[str]:
414    """Execute shell command with real time output printing and capture."""
416    def read_output(read_src, print_dst, capture_dst):
417      while (output := read_src.readline()) or process.poll() is None:
418        if output:
419          if print_output:
420            print(output, end='', file=print_dst)
421          capture_dst.append(output)
423    with subprocess.Popen(
424        cmd,
425        stdout=subprocess.PIPE,
426        stderr=subprocess.PIPE,
427        text=True,
428        env=env,
429        cwd=cwd,
430    ) as process:
431      stdout = []
432      stderr = []
433      with concurrent.futures.ThreadPoolExecutor() as executor:
434        stdout_future = executor.submit(
435            read_output, process.stdout, sys.stdout, stdout
436        )
437        stderr_future = executor.submit(
438            read_output, process.stderr, sys.stderr, stderr
439        )
440      stdout_future.result()
441      stderr_future.result()
443      return subprocess.CompletedProcess(
444          cmd, process.poll(), ''.join(stdout), ''.join(stderr)
445      )
447  @staticmethod
448  def _get_jdk_path_list() -> str:
449    """Get the relative jdk directory in build environment."""
450    if split_build_test_script.ANDROID_BUILD_TOP_KEY not in os.environ:
451      return []
452    absolute_path = pathlib.Path(os.environ['ANDROID_JAVA_HOME'])
453    while not absolute_path.name.startswith('jdk'):
454      absolute_path = absolute_path.parent
455    if not absolute_path.name.startswith('jdk'):
456      raise ValueError(
457          'Unrecognized jdk directory ' + os.environ['ANDROID_JAVA_HOME']
458      )
459    repo_root = pathlib.Path(
460        os.environ[split_build_test_script.ANDROID_BUILD_TOP_KEY]
461    )
462    return [absolute_path.relative_to(repo_root).as_posix()]
465def main():
466  """Main method to run the integration tests."""
468  def argparser_update_func(parser):
469    parser.add_argument(
470        '--use-prebuilt-atest-binary',
471        action='store_true',
472        default=False,
473        help=(
474            'Set the default atest binary to the prebuilt `atest` instead'
475            ' of `atest-dev`.'
476        ),
477    )
479  def config_update_function(config, args):
480    config.use_prebuilt_atest_binary = args.use_prebuilt_atest_binary
482  split_build_test_script.main(
483      argv=sys.argv,
484      make_before_build=['atest'],
485      argparser_update_func=argparser_update_func,
486      config_update_function=config_update_function,
487  )