1#!/usr/bin/env python3 2# 3# Copyright 2023, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Base test module for Atest integration tests.""" 18import concurrent.futures 19import json 20import logging 21import os 22import pathlib 23import re 24import shutil 25import subprocess 26import sys 27import time 28from typing import Any 29 30import split_build_test_script 31 32# Exporting for test modules' typing reference 33SplitBuildTestScript = split_build_test_script.SplitBuildTestScript 34StepInput = split_build_test_script.StepInput 35StepOutput = split_build_test_script.StepOutput 36run_in_parallel = split_build_test_script.ParallelTestRunner.run_in_parallel 37setup_parallel_in_build_env = ( 38 split_build_test_script.ParallelTestRunner.setup_parallel_in_build_env 39) 40 41# Note: The following constants should ideally be imported from their 42# corresponding prod source code, but this makes local execution of the 43# integration test harder due to some special dependencies in the prod 44# code. Therefore we copy the definition here for now in favor of easier 45# local integration test execution. If value changes in the source code 46# breaking the integration test becomes a problem in the future, we can 47# reconsider importing these constants. 48# Printed before the html log line. Defined in atest/atest_utils.py. 49_HTML_LOG_PRINT_PREFIX = 'To access logs, press "ctrl" and click on' 50# Stdout print prefix for results directory. Defined in atest/atest_main.py 51_RESULTS_DIR_PRINT_PREFIX = 'Atest results and logs directory: ' 52 53 54class LogEntry: 55 """Represents a single log entry.""" 56 57 def __init__( 58 self, 59 timestamp_str, 60 src_file_name, 61 src_file_line_number, 62 log_level, 63 content_lines, 64 ): 65 """Initializes a LogEntry object from a logging line. 66 67 Args: 68 timestamp_str: The timestamp header string in each log entry. 69 src_file_name: The source file name in the log entry. 70 src_file_line_number: The source file line number in the log entry. 71 log_level: The log level string in the log entry. 72 content_lines: A list of log entry content lines. 73 """ 74 self._timestamp_string = timestamp_str 75 self._source_file_name = src_file_name 76 self._source_file_line_number = src_file_line_number 77 self._log_level = log_level 78 self._content_lines = content_lines 79 80 def get_timestamp(self) -> float: 81 """Returns the timestamp of the log entry as an epoch time.""" 82 return time.mktime( 83 time.strptime(self._timestamp_string, '%Y-%m-%d %H:%M:%S') 84 ) 85 86 def get_timestamp_string(self) -> str: 87 """Returns the timestamp of the log entry as a string.""" 88 return self._timestamp_string 89 90 def get_source_file_name(self) -> str: 91 """Returns the source file name of the log entry.""" 92 return self._source_file_name 93 94 def get_source_file_line_number(self) -> int: 95 """Returns the source file line number of the log entry.""" 96 return self._source_file_line_number 97 98 def get_log_level(self) -> str: 99 """Returns the log level of the log entry.""" 100 return self._log_level 101 102 def get_content(self) -> str: 103 """Returns the content of the log entry.""" 104 return '\n'.join(self._content_lines) 105 106 107class AtestRunResult: 108 """A class to store Atest run result and get detailed run information.""" 109 110 def __init__( 111 self, 112 completed_process: subprocess.CompletedProcess[str], 113 env: dict[str, str], 114 repo_root: str, 115 config: split_build_test_script.IntegrationTestConfiguration, 116 ): 117 self._completed_process = completed_process 118 self._env = env 119 self._repo_root = repo_root 120 self._config = config 121 122 def get_returncode(self) -> int: 123 """Returns the return code of the completed process.""" 124 return self._completed_process.returncode 125 126 def get_stdout(self) -> str: 127 """Returns the standard output of the completed process.""" 128 return self._completed_process.stdout 129 130 def get_stderr(self) -> str: 131 """Returns the standard error of the completed process.""" 132 return self._completed_process.stderr 133 134 def get_cmd_list(self) -> list[str]: 135 """Returns the command list used in the process run.""" 136 return self._completed_process.args 137 138 def get_results_dir_path(self, snapshot_ready=False) -> pathlib.Path: 139 """Returns the atest results directory path. 140 141 Args: 142 snapshot_ready: Whether to make the result root directory snapshot 143 ready. When set to True and called in build environment, this method 144 will copy the path into <repo_root>/out with dereferencing so that the 145 directory can be safely added to snapshot. 146 147 Raises: 148 RuntimeError: Failed to parse the result dir path. 149 150 Returns: 151 The Atest result directory path. 152 """ 153 results_dir = None 154 for line in self.get_stdout().splitlines(keepends=False): 155 if line.startswith(_RESULTS_DIR_PRINT_PREFIX): 156 results_dir = pathlib.Path(line[len(_RESULTS_DIR_PRINT_PREFIX) :]) 157 if not results_dir: 158 raise RuntimeError('Failed to parse the result directory from stdout.') 159 160 if self._config.is_test_env or not snapshot_ready: 161 return results_dir 162 163 result_dir_copy_path = pathlib.Path(self._env['OUT_DIR']).joinpath( 164 'atest_integration_tests', results_dir.name 165 ) 166 if not result_dir_copy_path.exists(): 167 shutil.copytree(results_dir, result_dir_copy_path, symlinks=False) 168 169 return result_dir_copy_path 170 171 def get_test_result_dict(self) -> dict[str, Any]: 172 """Gets the atest results loaded from the test_result json. 173 174 Returns: 175 Atest result information loaded from the test_result json file. The test 176 result usually contains information about test runners and test 177 pass/fail results. 178 """ 179 json_path = self.get_results_dir_path() / 'test_result' 180 with open(json_path, 'r', encoding='utf-8') as f: 181 return json.load(f) 182 183 def get_passed_count(self) -> int: 184 """Gets the total number of passed tests from atest summary.""" 185 return self.get_test_result_dict()['total_summary']['PASSED'] 186 187 def get_failed_count(self) -> int: 188 """Gets the total number of failed tests from atest summary.""" 189 return self.get_test_result_dict()['total_summary']['FAILED'] 190 191 def get_ignored_count(self) -> int: 192 """Gets the total number of ignored tests from atest summary.""" 193 return self.get_test_result_dict()['total_summary']['IGNORED'] 194 195 def get_atest_log(self) -> str: 196 """Gets the log content read from the atest log file.""" 197 log_path = self.get_results_dir_path() / 'atest.log' 198 return log_path.read_text(encoding='utf-8') 199 200 def get_atest_log_entries(self) -> list[LogEntry]: 201 """Gets the parsed atest log entries list from the atest log file. 202 203 This method parse the atest log file and construct a new entry when a line 204 starts with a time string, source file name, line number, and log level. 205 206 Returns: 207 A list of parsed log entries. 208 """ 209 entries = [] 210 last_content_lines = [] 211 for line in self.get_atest_log().splitlines(): 212 regex = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+?):(\d+):(\w+): (.*)' 213 match = re.match(regex, line) 214 if match: 215 last_content_lines = [match.group(5)] 216 entries.append( 217 LogEntry( 218 match.group(1), 219 match.group(2), 220 int(match.group(3)), 221 match.group(4), 222 last_content_lines, 223 ) 224 ) 225 else: 226 if last_content_lines: 227 last_content_lines.append(line) 228 229 return entries 230 231 def get_atest_log_values_from_prefix(self, prefix: str) -> list[str]: 232 """Gets log values from lines starting with the given log prefix.""" 233 res = [] 234 for entry in self.get_atest_log_entries(): 235 content = entry.get_content() 236 if content.startswith(prefix): 237 res.append(content[len(prefix) :]) 238 return res 239 240 def check_returncode(self) -> None: 241 """Checks the return code and raises an exception if non-zero.""" 242 243 def add_line_prefix(msg: str): 244 return ( 245 ''.join(('> %s' % line for line in msg.splitlines(keepends=True))) 246 if msg 247 else msg 248 ) 249 250 stderr = ( 251 f'stderr:\n{add_line_prefix(self.get_stderr())}\n' 252 if self.get_stderr() and self.get_stderr().strip() 253 else '' 254 ) 255 256 if self.get_returncode() != 0: 257 raise RuntimeError( 258 f'Atest command {self.get_cmd_list()} finished with exit code' 259 f' {self.get_returncode()}.\n' 260 f'stdout:\n{add_line_prefix(self.get_stdout())}\n{stderr}' 261 ) 262 263 def get_local_reproduce_debug_cmd(self) -> str: 264 """Returns a full reproduce command for local debugging purpose. 265 266 Returns: 267 A command that can be executed directly in command line to 268 reproduce the atest command. 269 """ 270 return '(cd {dir} && {env} {cmd})'.format( 271 dir=self._repo_root, 272 env=' '.join((k + '=' + v for k, v in self._env.items())), 273 cmd=' '.join(self.get_cmd_list()), 274 ) 275 276 277class AtestTestCase(split_build_test_script.SplitBuildTestTestCase): 278 """Base test case for build-test environment split integration tests.""" 279 280 def setUp(self): 281 super().setUp() 282 # Default include list of repo paths for snapshot 283 self._default_snapshot_include_paths = [ 284 '$OUT_DIR/host/linux-x86', 285 '$OUT_DIR/target/product/*/module-info*', 286 '$OUT_DIR/target/product/*/testcases', 287 '$OUT_DIR/target/product/*/data', 288 '$OUT_DIR/target/product/*/all_modules.txt', 289 '$OUT_DIR/soong/module_bp*', 290 'tools/asuite/atest/test_runners/roboleaf_launched.txt', 291 '.repo/manifest.xml', 292 'build/soong/soong_ui.bash', 293 'prebuilts/build-tools/path/linux-x86/python3', 294 'prebuilts/build-tools/linux-x86/bin/py3-cmd', 295 'prebuilts/build-tools', 296 'prebuilts/asuite/atest/linux-x86', 297 ] 298 299 # Default exclude list of repo paths for snapshot 300 self._default_snapshot_exclude_paths = [ 301 '$OUT_DIR/host/linux-x86/bin/go', 302 '$OUT_DIR/host/linux-x86/bin/soong_build', 303 '$OUT_DIR/host/linux-x86/obj', 304 ] 305 306 # Default list of environment variables to take and restore in snapshots 307 self._default_snapshot_env_keys = [ 308 split_build_test_script.ANDROID_BUILD_TOP_KEY, 309 'ANDROID_HOST_OUT', 310 'ANDROID_PRODUCT_OUT', 311 'ANDROID_HOST_OUT_TESTCASES', 312 'ANDROID_TARGET_OUT_TESTCASES', 313 'OUT', 314 'OUT_DIR', 315 'PATH', 316 'HOST_OUT_TESTCASES', 317 'ANDROID_JAVA_HOME', 318 'JAVA_HOME', 319 ] 320 321 def create_atest_script(self, name: str = None) -> SplitBuildTestScript: 322 """Create an instance of atest integration test utility.""" 323 return self.create_split_build_test_script(name) 324 325 def create_step_output(self) -> StepOutput: 326 """Create a step output object with default values.""" 327 out = StepOutput() 328 out.add_snapshot_include_paths(self._default_snapshot_include_paths) 329 out.add_snapshot_exclude_paths(self._default_snapshot_exclude_paths) 330 out.add_snapshot_env_keys(self._default_snapshot_env_keys) 331 out.add_snapshot_include_paths(self._get_jdk_path_list()) 332 return out 333 334 @classmethod 335 def run_atest_command( 336 cls, 337 cmd: str, 338 step_in: split_build_test_script.StepInput, 339 include_device_serial: bool, 340 print_output: bool = True, 341 use_prebuilt_atest_binary=None, 342 ) -> AtestRunResult: 343 """Run either `atest-dev` or `atest` command through subprocess. 344 345 Args: 346 cmd: command string for Atest. Do not add 'atest-dev' or 'atest' in the 347 beginning of the command. 348 step_in: The step input object from build or test step. 349 include_device_serial: Whether a device is required for the atest 350 command. This argument is only used to determine whether to include 351 device serial in the command. It does not add device/deviceless 352 arguments such as '--host'. 353 print_output: Whether to print the stdout and stderr while the command 354 is running. 355 use_prebuilt_atest_binary: Whether to run the command using the prebuilt 356 atest binary instead of the atest-dev binary. 357 358 Returns: 359 An AtestRunResult object containing the run information. 360 """ 361 if use_prebuilt_atest_binary is None: 362 use_prebuilt_atest_binary = step_in.get_config().use_prebuilt_atest_binary 363 atest_binary = 'atest' if use_prebuilt_atest_binary else 'atest-dev' 364 365 # TODO: b/336839543 - Throw error here when serial is required but not set 366 # instead of from step_in.get_device_serial_args_or_empty() 367 serial_arg = ( 368 step_in.get_device_serial_args_or_empty() 369 if include_device_serial 370 else '' 371 ) 372 complete_cmd = f'{atest_binary}{serial_arg} {cmd}' 373 374 indentation = ' ' 375 logging.debug('Executing atest command: %s', complete_cmd) 376 logging.debug( 377 '%sCommand environment variables: %s', indentation, step_in.get_env() 378 ) 379 result = AtestRunResult( 380 cls._run_shell_command( 381 complete_cmd.split(), 382 env=step_in.get_env(), 383 cwd=step_in.get_repo_root(), 384 print_output=print_output, 385 ), 386 step_in.get_env(), 387 step_in.get_repo_root(), 388 step_in.get_config(), 389 ) 390 391 wrap_output_lines = lambda output_str: ''.join(( 392 f'{indentation * 2}> %s' % line for line in output_str.splitlines(True) 393 )) 394 logging.debug( 395 '%sCommand stdout:\n%s', 396 indentation, 397 wrap_output_lines(result.get_stdout()), 398 ) 399 logging.debug( 400 '%sAtest log:\n%s', 401 indentation, 402 wrap_output_lines(result.get_atest_log()), 403 ) 404 405 return result 406 407 @staticmethod 408 def _run_shell_command( 409 cmd: list[str], 410 env: dict[str, str], 411 cwd: str, 412 print_output: bool = True, 413 ) -> subprocess.CompletedProcess[str]: 414 """Execute shell command with real time output printing and capture.""" 415 416 def read_output(read_src, print_dst, capture_dst): 417 while (output := read_src.readline()) or process.poll() is None: 418 if output: 419 if print_output: 420 print(output, end='', file=print_dst) 421 capture_dst.append(output) 422 423 with subprocess.Popen( 424 cmd, 425 stdout=subprocess.PIPE, 426 stderr=subprocess.PIPE, 427 text=True, 428 env=env, 429 cwd=cwd, 430 ) as process: 431 stdout = [] 432 stderr = [] 433 with concurrent.futures.ThreadPoolExecutor() as executor: 434 stdout_future = executor.submit( 435 read_output, process.stdout, sys.stdout, stdout 436 ) 437 stderr_future = executor.submit( 438 read_output, process.stderr, sys.stderr, stderr 439 ) 440 stdout_future.result() 441 stderr_future.result() 442 443 return subprocess.CompletedProcess( 444 cmd, process.poll(), ''.join(stdout), ''.join(stderr) 445 ) 446 447 @staticmethod 448 def _get_jdk_path_list() -> str: 449 """Get the relative jdk directory in build environment.""" 450 if split_build_test_script.ANDROID_BUILD_TOP_KEY not in os.environ: 451 return [] 452 absolute_path = pathlib.Path(os.environ['ANDROID_JAVA_HOME']) 453 while not absolute_path.name.startswith('jdk'): 454 absolute_path = absolute_path.parent 455 if not absolute_path.name.startswith('jdk'): 456 raise ValueError( 457 'Unrecognized jdk directory ' + os.environ['ANDROID_JAVA_HOME'] 458 ) 459 repo_root = pathlib.Path( 460 os.environ[split_build_test_script.ANDROID_BUILD_TOP_KEY] 461 ) 462 return [absolute_path.relative_to(repo_root).as_posix()] 463 464 465def main(): 466 """Main method to run the integration tests.""" 467 468 def argparser_update_func(parser): 469 parser.add_argument( 470 '--use-prebuilt-atest-binary', 471 action='store_true', 472 default=False, 473 help=( 474 'Set the default atest binary to the prebuilt `atest` instead' 475 ' of `atest-dev`.' 476 ), 477 ) 478 479 def config_update_function(config, args): 480 config.use_prebuilt_atest_binary = args.use_prebuilt_atest_binary 481 482 split_build_test_script.main( 483 argv=sys.argv, 484 make_before_build=['atest'], 485 argparser_update_func=argparser_update_func, 486 config_update_function=config_update_function, 487 ) 488