1#!/usr/bin/env python3
2#
3# Copyright 2017, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Command line utility for running Android tests through TradeFederation.
18
19atest helps automate the flow of building test modules across the Android
20code base and executing the tests via the TradeFederation test harness.
21
22atest is designed to support any test types that can be ran by TradeFederation.
23"""
24
25# pylint: disable=too-many-lines
26
27from __future__ import annotations
28from __future__ import print_function
29
30from abc import ABC, abstractmethod
31import argparse
32import collections
33from dataclasses import dataclass
34import itertools
35import logging
36import os
37import platform
38import sys
39import tempfile
40import time
41from typing import Any, Dict, List, Set, Tuple
42
43from atest import arg_parser
44from atest import atest_configs
45from atest import atest_execution_info
46from atest import atest_utils
47from atest import banner
48from atest import bazel_mode
49from atest import bug_detector
50from atest import cli_translator
51from atest import constants
52from atest import device_update
53from atest import module_info
54from atest import result_reporter
55from atest import test_runner_handler
56from atest.atest_enum import DetectType, ExitCode
57from atest.coverage import coverage
58from atest.metrics import metrics
59from atest.metrics import metrics_base
60from atest.metrics import metrics_utils
61from atest.test_finders import test_finder_utils
62from atest.test_finders import test_info
63from atest.test_finders.test_info import TestInfo
64from atest.test_runner_invocation import TestRunnerInvocation
65from atest.tools import indexing
66from atest.tools import start_avd as avd
67
68EXPECTED_VARS = frozenset([
69    constants.ANDROID_BUILD_TOP,
70    'ANDROID_TARGET_OUT_TESTCASES',
71    constants.ANDROID_OUT,
72])
73TEST_RUN_DIR_PREFIX = '%Y%m%d_%H%M%S'
74CUSTOM_ARG_FLAG = '--'
75OPTION_NOT_FOR_TEST_MAPPING = (
76    'Option "{}" does not work for running tests in TEST_MAPPING files'
77)
78
79DEVICE_TESTS = 'tests that require device'
80HOST_TESTS = 'tests that do NOT require device'
81RESULT_HEADER_FMT = '\nResults from %(test_type)s:'
82RUN_HEADER_FMT = '\nRunning %(test_count)d %(test_type)s.'
83TEST_COUNT = 'test_count'
84TEST_TYPE = 'test_type'
85END_OF_OPTION = '--'
86HAS_IGNORED_ARGS = False
87# Conditions that atest should exit without sending result to metrics.
88EXIT_CODES_BEFORE_TEST = [
89    ExitCode.ENV_NOT_SETUP,
90    ExitCode.TEST_NOT_FOUND,
91    ExitCode.OUTSIDE_ROOT,
92    ExitCode.AVD_CREATE_FAILURE,
93    ExitCode.AVD_INVALID_ARGS,
94]
95
96# Stdout print prefix for results directory. May be used in integration tests.
97_RESULTS_DIR_PRINT_PREFIX = 'Atest results and logs directory: '
98# Log prefix for dry-run run command. May be used in integration tests.
99_DRY_RUN_COMMAND_LOG_PREFIX = 'Internal run command from dry-run: '
100
101
102@dataclass
103class Steps:
104  """A Dataclass that stores steps and shows step assignments."""
105
106  _build: bool
107  _device_update: bool
108  _install: bool
109  _test: bool
110
111  def has_build(self):
112    """Return whether build is in steps."""
113    return self._build
114
115  def is_build_only(self):
116    """Return whether build is the only one in steps."""
117    return self._build and not any(
118        (self._test, self._install, self._device_update)
119    )
120
121  def has_device_update(self):
122    """Return whether device update is in steps."""
123    return self._device_update
124
125  def has_install(self):
126    """Return whether install is in steps."""
127    return self._install
128
129  def has_test(self):
130    """Return whether install is the only one in steps."""
131    return self._test
132
133  def is_test_only(self):
134    """Return whether build is not in steps but test."""
135    return self._test and not any(
136        (self._build, self._install, self._device_update)
137    )
138
139
140def parse_steps(args: arg_parser.AtestArgParser) -> Steps:
141  """Return Steps object.
142
143  Args:
144      args: an AtestArgParser object.
145
146  Returns:
147      Step object that stores the boolean of build, install and test.
148  """
149  # Implicitly running 'build', 'install' and 'test' when args.steps is None.
150  if not args.steps:
151    return Steps(True, args.update_device, True, True)
152  build = constants.BUILD_STEP in args.steps
153  test = constants.TEST_STEP in args.steps
154  install = constants.INSTALL_STEP in args.steps
155  if install and not test:
156    atest_utils.print_and_log_warning(
157        'Installing without test step is currently not '
158        'supported; Atest will proceed testing!'
159    )
160    test = True
161  return Steps(build, args.update_device, install, test)
162
163
164def _get_args_from_config():
165  """Get customized atest arguments in the config file.
166
167  If the config has not existed yet, atest will initialize an example
168  config file for it without any effective options.
169
170  Returns:
171      A list read from the config file.
172  """
173  _config = atest_utils.get_config_folder().joinpath('config')
174  if not _config.parent.is_dir():
175    _config.parent.mkdir(parents=True)
176  args = []
177  if not _config.is_file():
178    with open(_config, 'w+', encoding='utf8') as cache:
179      cache.write(constants.ATEST_EXAMPLE_ARGS)
180    return args
181  warning = 'Line {} contains {} and will be ignored.'
182  print(
183      '\n{} {}'.format(
184          atest_utils.mark_cyan('Reading config:'),
185          atest_utils.mark_yellow(_config),
186      )
187  )
188  # pylint: disable=global-statement:
189  global HAS_IGNORED_ARGS
190  with open(_config, 'r', encoding='utf8') as cache:
191    for entry in cache.readlines():
192      # Strip comments.
193      arg_in_line = entry.partition('#')[0].strip()
194      # Strip test name/path.
195      if arg_in_line.startswith('-'):
196        # Process argument that contains whitespaces.
197        # e.g. ["--serial foo"] -> ["--serial", "foo"]
198        if len(arg_in_line.split()) > 1:
199          # remove "--" to avoid messing up atest/tradefed commands.
200          if END_OF_OPTION in arg_in_line.split():
201            HAS_IGNORED_ARGS = True
202            print(
203                warning.format(
204                    atest_utils.mark_yellow(arg_in_line), END_OF_OPTION
205                )
206            )
207          args.extend(arg_in_line.split())
208        else:
209          if END_OF_OPTION == arg_in_line:
210            HAS_IGNORED_ARGS = True
211            print(
212                warning.format(
213                    atest_utils.mark_yellow(arg_in_line), END_OF_OPTION
214                )
215            )
216          args.append(arg_in_line)
217  return args
218
219
220def _parse_args(argv: List[Any]) -> Tuple[argparse.ArgumentParser, List[str]]:
221  """Parse command line arguments.
222
223  Args:
224      argv: A list of arguments.
225
226  Returns:
227      A tuple of an argparse.ArgumentParser class instance holding parsed args
228  """
229  # Store everything after '--' in custom_args.
230  pruned_argv = argv
231  custom_args_index = None
232  if CUSTOM_ARG_FLAG in argv:
233    custom_args_index = argv.index(CUSTOM_ARG_FLAG)
234    pruned_argv = argv[:custom_args_index]
235  args = arg_parser.create_atest_arg_parser().parse_args(pruned_argv)
236  args.custom_args = []
237  if custom_args_index is not None:
238    for arg in argv[custom_args_index + 1 :]:
239      logging.debug('Quoting regex argument %s', arg)
240      args.custom_args.append(atest_utils.quote(arg))
241
242  return args
243
244
245def _configure_logging(verbose: bool, results_dir: str):
246  """Configure the logger.
247
248  Args:
249      verbose: If true display DEBUG level logs on console.
250      results_dir: A directory which stores the ATest execution information.
251  """
252  log_fmat = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s'
253  date_fmt = '%Y-%m-%d %H:%M:%S'
254  log_path = os.path.join(results_dir, 'atest.log')
255
256  logger = logging.getLogger('')
257  # Clear the handlers to prevent logging.basicConfig from being called twice.
258  logger.handlers = []
259
260  logging.basicConfig(
261      filename=log_path, level=logging.DEBUG, format=log_fmat, datefmt=date_fmt
262  )
263
264  class _StreamToLogger:
265    """A file like class to that redirect writes to a printer and logger."""
266
267    def __init__(self, logger, log_level, printer):
268      self._logger = logger
269      self._log_level = log_level
270      self._printer = printer
271      self._buffers = []
272
273    def write(self, buf: str) -> None:
274      self._printer.write(buf)
275
276      if len(buf) == 1 and buf[0] == '\n' and self._buffers:
277        self._logger.log(self._log_level, ''.join(self._buffers))
278        self._buffers.clear()
279      else:
280        self._buffers.append(buf)
281
282    def flush(self) -> None:
283      self._printer.flush()
284
285  stdout_log_level = 25
286  stderr_log_level = 45
287  logging.addLevelName(stdout_log_level, 'STDOUT')
288  logging.addLevelName(stderr_log_level, 'STDERR')
289  sys.stdout = _StreamToLogger(logger, stdout_log_level, sys.stdout)
290  sys.stderr = _StreamToLogger(logger, stderr_log_level, sys.stderr)
291
292
293def _missing_environment_variables():
294  """Verify the local environment has been set up to run atest.
295
296  Returns:
297      List of strings of any missing environment variables.
298  """
299  missing = list(
300      filter(None, [x for x in EXPECTED_VARS if not os.environ.get(x)])
301  )
302  if missing:
303    atest_utils.print_and_log_error(
304        "Local environment doesn't appear to have been "
305        'initialized. Did you remember to run lunch? Expected '
306        'Environment Variables: %s.',
307        missing,
308    )
309  return missing
310
311
312def make_test_run_dir():
313  """Make the test run dir in ATEST_RESULT_ROOT.
314
315  Returns:
316      A string of the dir path.
317  """
318  if not os.path.exists(constants.ATEST_RESULT_ROOT):
319    os.makedirs(constants.ATEST_RESULT_ROOT)
320  ctime = time.strftime(TEST_RUN_DIR_PREFIX, time.localtime())
321  test_result_dir = tempfile.mkdtemp(
322      prefix='%s_' % ctime, dir=constants.ATEST_RESULT_ROOT
323  )
324  print(_RESULTS_DIR_PRINT_PREFIX + test_result_dir)
325  return test_result_dir
326
327
328def get_extra_args(args):
329  """Get extra args for test runners.
330
331  Args:
332      args: arg parsed object.
333
334  Returns:
335      Dict of extra args for test runners to utilize.
336  """
337  extra_args = {}
338  if args.wait_for_debugger:
339    extra_args[constants.WAIT_FOR_DEBUGGER] = None
340  if not parse_steps(args).has_install():
341    extra_args[constants.DISABLE_INSTALL] = None
342  # The key and its value of the dict can be called via:
343  # if args.aaaa:
344  #     extra_args[constants.AAAA] = args.aaaa
345  arg_maps = {
346      'all_abi': constants.ALL_ABI,
347      'annotation_filter': constants.ANNOTATION_FILTER,
348      'bazel_arg': constants.BAZEL_ARG,
349      'collect_tests_only': constants.COLLECT_TESTS_ONLY,
350      'experimental_coverage': constants.COVERAGE,
351      'custom_args': constants.CUSTOM_ARGS,
352      'device_only': constants.DEVICE_ONLY,
353      'disable_teardown': constants.DISABLE_TEARDOWN,
354      'disable_upload_result': constants.DISABLE_UPLOAD_RESULT,
355      'dry_run': constants.DRY_RUN,
356      'host': constants.HOST,
357      'instant': constants.INSTANT,
358      'iterations': constants.ITERATIONS,
359      'request_upload_result': constants.REQUEST_UPLOAD_RESULT,
360      'bazel_mode_features': constants.BAZEL_MODE_FEATURES,
361      'rerun_until_failure': constants.RERUN_UNTIL_FAILURE,
362      'retry_any_failure': constants.RETRY_ANY_FAILURE,
363      'serial': constants.SERIAL,
364      'sharding': constants.SHARDING,
365      'test_filter': constants.TEST_FILTER,
366      'test_timeout': constants.TEST_TIMEOUT,
367      'tf_debug': constants.TF_DEBUG,
368      'tf_template': constants.TF_TEMPLATE,
369      'user_type': constants.USER_TYPE,
370      'verbose': constants.VERBOSE,
371      'use_tf_min_base_template': constants.USE_TF_MIN_BASE_TEMPLATE,
372  }
373  not_match = [k for k in arg_maps if k not in vars(args)]
374  if not_match:
375    raise AttributeError(
376        '%s object has no attribute %s' % (type(args).__name__, not_match)
377    )
378  extra_args.update({
379      arg_maps.get(k): v for k, v in vars(args).items() if arg_maps.get(k) and v
380  })
381  return extra_args
382
383
384def _validate_exec_mode(args, test_infos: list[TestInfo], host_tests=None):
385  """Validate all test execution modes are not in conflict.
386
387  Exit the program with INVALID_EXEC_MODE code if the desired is a host-side
388  test but the given is a device-side test.
389
390  If the given is a host-side test and not specified `args.host`, forcibly
391  set `args.host` to True.
392
393  Args:
394      args: parsed args object.
395      test_infos: a list of TestInfo objects.
396      host_tests: True if all tests should be deviceless, False if all tests
397        should be device tests. Default is set to None, which means tests can be
398        either deviceless or device tests.
399  """
400  all_device_modes = {x.get_supported_exec_mode() for x in test_infos}
401  err_msg = None
402  # In the case of '$atest <device-only> --host', exit.
403  if (host_tests or args.host) and constants.DEVICE_TEST in all_device_modes:
404    device_only_tests = [
405        x.test_name
406        for x in test_infos
407        if x.get_supported_exec_mode() == constants.DEVICE_TEST
408    ]
409    err_msg = (
410        'Specified --host, but the following tests are device-only:\n  '
411        + '\n  '.join(sorted(device_only_tests))
412        + '\nPlease remove the  option when running device-only tests.'
413    )
414  # In the case of '$atest <host-only> <device-only> --host' or
415  # '$atest <host-only> <device-only>', exit.
416  if (
417      constants.DEVICELESS_TEST in all_device_modes
418      and constants.DEVICE_TEST in all_device_modes
419  ):
420    err_msg = 'There are host-only and device-only tests in command.'
421  if host_tests is False and constants.DEVICELESS_TEST in all_device_modes:
422    err_msg = 'There are host-only tests in command.'
423  if err_msg:
424    atest_utils.print_and_log_error(err_msg)
425    metrics_utils.send_exit_event(ExitCode.INVALID_EXEC_MODE, logs=err_msg)
426    sys.exit(ExitCode.INVALID_EXEC_MODE)
427  # The 'adb' may not be available for the first repo sync or a clean build;
428  # run `adb devices` in the build step again.
429  if atest_utils.has_command('adb'):
430    _validate_adb_devices(args, test_infos)
431  # In the case of '$atest <host-only>', we add --host to run on host-side.
432  # The option should only be overridden if `host_tests` is not set.
433  if not args.host and host_tests is None:
434    logging.debug('Appending "--host" for a deviceless test...')
435    args.host = bool(constants.DEVICELESS_TEST in all_device_modes)
436
437
438def _validate_adb_devices(args, test_infos):
439  """Validate the availability of connected devices via adb command.
440
441  Exit the program with error code if have device-only and host-only.
442
443  Args:
444      args: parsed args object.
445      test_infos: TestInfo object.
446  """
447  # No need to check device availability if the user does not acquire to test.
448  if not parse_steps(args).has_test():
449    return
450  if args.no_checking_device:
451    return
452  # No need to check local device availability if the device test is running
453  # remotely.
454  if args.bazel_mode_features and (
455      bazel_mode.Features.EXPERIMENTAL_REMOTE_AVD in args.bazel_mode_features
456  ):
457    return
458  all_device_modes = {x.get_supported_exec_mode() for x in test_infos}
459  device_tests = [
460      x.test_name
461      for x in test_infos
462      if x.get_supported_exec_mode() != constants.DEVICELESS_TEST
463  ]
464  # Only block testing if it is a device test.
465  if constants.DEVICE_TEST in all_device_modes:
466    if (
467        not any((args.host, args.start_avd, args.acloud_create))
468        and not atest_utils.get_adb_devices()
469    ):
470      err_msg = (
471          f'Stop running test(s): {", ".join(device_tests)} require a device.'
472      )
473      atest_utils.colorful_print(err_msg, constants.RED)
474      logging.debug(atest_utils.mark_red(constants.REQUIRE_DEVICES_MSG))
475      metrics_utils.send_exit_event(ExitCode.DEVICE_NOT_FOUND, logs=err_msg)
476      sys.exit(ExitCode.DEVICE_NOT_FOUND)
477
478
479def _validate_tm_tests_exec_mode(
480    args: argparse.Namespace,
481    device_test_infos: List[test_info.TestInfo],
482    host_test_infos: List[test_info.TestInfo],
483):
484  """Validate all test execution modes are not in conflict.
485
486  Validate the tests' platform variant setting. For device tests, exit the
487  program if any test is found for host-only. For host tests, exit the
488  program if any test is found for device-only.
489
490  Args:
491      args: parsed args object.
492      device_test_infos: TestInfo instances for device tests.
493      host_test_infos: TestInfo instances for host tests.
494  """
495
496  # No need to verify device tests if atest command is set to only run host
497  # tests.
498  if device_test_infos and not args.host:
499    _validate_exec_mode(args, device_test_infos, host_tests=False)
500  if host_test_infos:
501    _validate_exec_mode(args, host_test_infos, host_tests=True)
502
503
504def _has_valid_test_mapping_args(args):
505  """Validate test mapping args.
506
507  Not all args work when running tests in TEST_MAPPING files. Validate the
508  args before running the tests.
509
510  Args:
511      args: parsed args object.
512
513  Returns:
514      True if args are valid
515  """
516  is_test_mapping = atest_utils.is_test_mapping(args)
517  if is_test_mapping:
518    metrics.LocalDetectEvent(detect_type=DetectType.IS_TEST_MAPPING, result=1)
519  else:
520    metrics.LocalDetectEvent(detect_type=DetectType.IS_TEST_MAPPING, result=0)
521  if not is_test_mapping:
522    return True
523  options_to_validate = [
524      (args.annotation_filter, '--annotation-filter'),
525  ]
526  for arg_value, arg in options_to_validate:
527    if arg_value:
528      atest_utils.print_and_log_error(
529          atest_utils.mark_red(OPTION_NOT_FOR_TEST_MAPPING.format(arg))
530      )
531      return False
532  return True
533
534
535def _validate_args(args):
536  """Validate setups and args.
537
538  Exit the program with error code if any setup or arg is invalid.
539
540  Args:
541      args: parsed args object.
542  """
543  if _missing_environment_variables():
544    sys.exit(ExitCode.ENV_NOT_SETUP)
545  if not _has_valid_test_mapping_args(args):
546    sys.exit(ExitCode.INVALID_TM_ARGS)
547
548
549def _print_module_info_from_module_name(mod_info, module_name):
550  """print out the related module_info for a module_name.
551
552  Args:
553      mod_info: ModuleInfo object.
554      module_name: A string of module.
555
556  Returns:
557      True if the module_info is found.
558  """
559  title_mapping = collections.OrderedDict()
560  title_mapping[constants.MODULE_COMPATIBILITY_SUITES] = 'Compatibility suite'
561  title_mapping[constants.MODULE_PATH] = 'Source code path'
562  title_mapping[constants.MODULE_INSTALLED] = 'Installed path'
563  target_module_info = mod_info.get_module_info(module_name)
564  is_module_found = False
565  if target_module_info:
566    atest_utils.colorful_print(module_name, constants.GREEN)
567    for title_key in title_mapping:
568      atest_utils.colorful_print(
569          '\t%s' % title_mapping[title_key], constants.CYAN
570      )
571      for info_value in target_module_info[title_key]:
572        print('\t\t{}'.format(info_value))
573    is_module_found = True
574  return is_module_found
575
576
577def _print_deprecation_warning(arg_to_deprecate: str):
578  """For features that are up for deprecation in the near future, print a message
579
580  to alert the user about the upcoming deprecation.
581
582  Args:
583      arg_to_deprecate: the arg with which the to-be-deprecated feature is
584        called.
585  """
586  args_to_deprecation_info = {
587      # arg_to_deprecate : (deprecation timeframe, additional info for users)
588      '--info': ('is deprecated.', '\nUse CodeSearch or `gomod` instead.')
589  }
590
591  warning_message = (
592      f'\nWARNING: The `{arg_to_deprecate}` feature '
593      + ' '.join(args_to_deprecation_info[arg_to_deprecate])
594      + '\nPlease file a bug or feature request to the Atest team if you have'
595      ' any concerns.'
596  )
597  atest_utils.colorful_print(warning_message, constants.RED)
598
599
600def is_from_test_mapping(test_infos):
601  """Check that the test_infos came from TEST_MAPPING files.
602
603  Args:
604      test_infos: A set of TestInfos.
605
606  Returns:
607      True if the test infos are from TEST_MAPPING files.
608  """
609  return list(test_infos)[0].from_test_mapping
610
611
612def _split_test_mapping_tests(test_infos):
613  """Split Test Mapping tests into 2 groups: device tests and host tests.
614
615  Args:
616      test_infos: A set of TestInfos.
617
618  Returns:
619      A tuple of (device_test_infos, host_test_infos), where
620      device_test_infos: A set of TestInfos for tests that require device.
621      host_test_infos: A set of TestInfos for tests that do NOT require
622          device.
623  """
624  assert is_from_test_mapping(test_infos)
625  host_test_infos = {info for info in test_infos if info.host}
626  device_test_infos = {info for info in test_infos if not info.host}
627  return device_test_infos, host_test_infos
628
629
630# pylint: disable=too-many-locals
631def _run_test_mapping_tests(
632    test_type_to_invocations: Dict[str, List[TestRunnerInvocation]],
633    extra_args: Dict[str, Any],
634) -> ExitCode:
635  """Run all tests in TEST_MAPPING files.
636
637  Args:
638      test_type_to_invocations: A dict mapping test runner invocations to test
639        types.
640      extra_args: A dict of extra args for others to utilize.
641
642  Returns:
643      Exit code.
644  """
645
646  test_results = []
647  for test_type, invocations in test_type_to_invocations.items():
648    tests = list(
649        itertools.chain.from_iterable(i.test_infos for i in invocations)
650    )
651    if not tests:
652      continue
653    header = RUN_HEADER_FMT % {TEST_COUNT: len(tests), TEST_TYPE: test_type}
654    atest_utils.colorful_print(header, constants.MAGENTA)
655    logging.debug('\n'.join([str(info) for info in tests]))
656
657    reporter = result_reporter.ResultReporter(
658        collect_only=extra_args.get(constants.COLLECT_TESTS_ONLY),
659        wait_for_debugger=atest_configs.GLOBAL_ARGS.wait_for_debugger,
660    )
661    reporter.print_starting_text()
662
663    tests_exit_code = ExitCode.SUCCESS
664    for invocation in invocations:
665      tests_exit_code |= invocation.run_all_tests(reporter)
666
667    atest_execution_info.AtestExecutionInfo.result_reporters.append(reporter)
668    test_results.append((tests_exit_code, reporter, test_type))
669
670  all_tests_exit_code = ExitCode.SUCCESS
671  failed_tests = []
672  for tests_exit_code, reporter, test_type in test_results:
673    atest_utils.colorful_print(
674        RESULT_HEADER_FMT % {TEST_TYPE: test_type}, constants.MAGENTA
675    )
676    result = tests_exit_code | reporter.print_summary()
677    if result:
678      failed_tests.append(test_type)
679    all_tests_exit_code |= result
680
681  # List failed tests at the end as a reminder.
682  if failed_tests:
683    atest_utils.colorful_print(
684        atest_utils.delimiter('=', 30, prenl=1), constants.YELLOW
685    )
686    atest_utils.colorful_print('\nFollowing tests failed:', constants.MAGENTA)
687    for failure in failed_tests:
688      atest_utils.colorful_print(failure, constants.RED)
689
690  return all_tests_exit_code
691
692
693def _dry_run(results_dir, extra_args, test_infos, mod_info):
694  """Only print the commands of the target tests rather than running them in
695
696  actual.
697
698  Args:
699      results_dir: Path for saving atest logs.
700      extra_args: Dict of extra args for test runners to utilize.
701      test_infos: A list of TestInfos.
702      mod_info: ModuleInfo object.
703
704  Returns:
705      A successful exit code.
706  """
707  all_run_cmds = []
708  for test_runner, tests in test_runner_handler.group_tests_by_test_runners(
709      test_infos
710  ):
711    runner = test_runner(results_dir, mod_info=mod_info, extra_args=extra_args)
712    run_cmds = runner.generate_run_commands(tests, extra_args)
713    for run_cmd in run_cmds:
714      all_run_cmds.append(run_cmd)
715      logging.debug(_DRY_RUN_COMMAND_LOG_PREFIX + run_cmd)
716      print(
717          'Would run test via command: %s' % (atest_utils.mark_green(run_cmd))
718      )
719  return ExitCode.SUCCESS
720
721
722def _print_testable_modules(mod_info, suite):
723  """Print the testable modules for a given suite.
724
725  Args:
726      mod_info: ModuleInfo object.
727      suite: A string of suite name.
728  """
729  testable_modules = mod_info.get_testable_modules(suite)
730  print(
731      '\n%s'
732      % atest_utils.mark_cyan(
733          '%s Testable %s modules' % (len(testable_modules), suite)
734      )
735  )
736  print(atest_utils.delimiter('-'))
737  for module in sorted(testable_modules):
738    print('\t%s' % module)
739
740
741def _is_inside_android_root():
742  """Identify whether the cwd is inside of Android source tree.
743
744  Returns:
745      False if the cwd is outside of the source tree, True otherwise.
746  """
747  build_top = os.getenv(constants.ANDROID_BUILD_TOP, ' ')
748  return build_top in os.getcwd()
749
750
751def _non_action_validator(args: argparse.ArgumentParser):
752  """Method for non-action arguments such as --version, --history,
753
754  --latest_result, etc.
755
756  Args:
757      args: An argparse.ArgumentParser object.
758  """
759  if not _is_inside_android_root():
760    atest_utils.colorful_print(
761        '\nAtest must always work under ${}!'.format(
762            constants.ANDROID_BUILD_TOP
763        ),
764        constants.RED,
765    )
766    sys.exit(ExitCode.OUTSIDE_ROOT)
767  if args.version:
768    print(atest_utils.get_atest_version())
769    sys.exit(ExitCode.SUCCESS)
770  if args.history:
771    atest_execution_info.print_test_result(
772        constants.ATEST_RESULT_ROOT, args.history
773    )
774    sys.exit(ExitCode.SUCCESS)
775  if args.latest_result:
776    atest_execution_info.print_test_result_by_path(constants.LATEST_RESULT_FILE)
777    sys.exit(ExitCode.SUCCESS)
778
779
780def _exclude_modules_in_targets(build_targets):
781  """Method that excludes MODULES-IN-* targets.
782
783  Args:
784      build_targets: A set of build targets.
785
786  Returns:
787      A set of build targets that excludes MODULES-IN-*.
788  """
789  shrank_build_targets = build_targets.copy()
790  logging.debug(
791      'Will exclude all "%s*" from the build targets.', constants.MODULES_IN
792  )
793  for target in build_targets:
794    if target.startswith(constants.MODULES_IN):
795      logging.debug('Ignore %s.', target)
796      shrank_build_targets.remove(target)
797  return shrank_build_targets
798
799
800# pylint: disable=protected-access
801def need_rebuild_module_info(args: arg_parser.AtestArgParser) -> bool:
802  """Method that tells whether we need to rebuild module-info.json or not.
803
804  Args:
805      args: an AtestArgParser object.
806
807  Returns:
808      True for forcely/smartly rebuild, otherwise False without rebuilding.
809  """
810  # +-----------------+
811  # | Explicitly pass |  yes
812  # |    '--test'     +-------> False (won't rebuild)
813  # +--------+--------+
814  #          | no
815  #          V
816  # +-------------------------+
817  # | Explicitly pass         |  yes
818  # | '--rebuild-module-info' +-------> True (forcely rebuild)
819  # +--------+----------------+
820  #          | no
821  #          V
822  # +-------------------+
823  # |    Build files    |  no
824  # | integrity is good +-------> True (smartly rebuild)
825  # +--------+----------+
826  #          | yes
827  #          V
828  #        False (won't rebuild)
829  if not parse_steps(args).has_build():
830    logging.debug('"--test" mode detected, will not rebuild module-info.')
831    return False
832  if args.rebuild_module_info:
833    msg = (
834        f'`{constants.REBUILD_MODULE_INFO_FLAG}` is no longer needed '
835        f'since Atest can smartly rebuild {module_info._MODULE_INFO} '
836        r'only when needed.'
837    )
838    atest_utils.colorful_print(msg, constants.YELLOW)
839    return True
840  logging.debug('Examinating the consistency of build files...')
841  if not atest_utils.build_files_integrity_is_ok():
842    logging.debug('Found build files were changed.')
843    return True
844  return False
845
846
847def need_run_index_targets(args: argparse.ArgumentParser):
848  """Method that determines whether Atest need to run index_targets or not.
849
850  The decision flow is as follows: If no build is required, returns False.
851  Otherwise, if some index files are missing, returns True. Otherwise, if
852  some arguments that doesn't require indexing is present, returns False.
853  Otherwise, returns True.
854
855  Args:
856      args: An argparse.ArgumentParser object.
857
858  Returns:
859      True when none of the above conditions were found.
860  """
861  has_build_step = parse_steps(args).has_build()
862  if not has_build_step:
863    logging.debug("Skip indexing because there's no build required.")
864    return False
865
866  if not indexing.Indices().has_all_indices():
867    logging.debug(
868        'Indexing targets is required because some index files do not exist.'
869    )
870    return True
871
872  no_indexing_args = (
873      args.dry_run,
874      args.list_modules,
875  )
876  if any(no_indexing_args):
877    logging.debug('Skip indexing for no_indexing_args=%s.', no_indexing_args)
878    return False
879
880  return True
881
882
883def set_build_output_mode(mode: atest_utils.BuildOutputMode):
884  """Update environment variable dict accordingly to args.build_output."""
885  # Changing this variable does not retrigger builds.
886  atest_utils.update_build_env(
887      {'ANDROID_QUIET_BUILD': 'true', 'BUILD_OUTPUT_MODE': mode.value}
888  )
889
890
891def get_device_count_config(test_infos, mod_info):
892  """Get the amount of desired devices from the test config.
893
894  Args:
895      test_infos: A set of TestInfo instances.
896      mod_info: ModuleInfo object.
897
898  Returns: the count of devices in test config. If there are more than one
899           configs, return the maximum.
900  """
901  max_count = 0
902  for tinfo in test_infos:
903    test_config, _ = test_finder_utils.get_test_config_and_srcs(tinfo, mod_info)
904    if test_config:
905      devices = atest_utils.get_config_device(test_config)
906      if devices:
907        max_count = max(len(devices), max_count)
908  return max_count
909
910
911def _send_start_event(argv: List[Any], tests: List[str]):
912  """Send AtestStartEvent to metrics"""
913  os_pyver = (
914      f'{platform.platform()}:{platform.python_version()}/'
915      f'{atest_utils.get_manifest_branch(True)}:'
916      f'{atest_utils.get_atest_version()}'
917  )
918  metrics.AtestStartEvent(
919      command_line=' '.join(argv),
920      test_references=tests,
921      cwd=os.getcwd(),
922      os=os_pyver,
923  )
924
925
926def _get_acloud_proc_and_log(
927    args: argparse.ArgumentParser, results_dir: str
928) -> Tuple[Any, Any]:
929  """Return tuple of acloud process ID and report file."""
930  if any((args.acloud_create, args.start_avd)):
931    logging.debug('Creating acloud or avd.')
932    return avd.acloud_create_validator(results_dir, args)
933  return None, None
934
935
936def has_set_sufficient_devices(
937    required_amount: int, serial: List[str] = None
938) -> bool:
939  """Detect whether sufficient device serial is set for test."""
940  given_amount = len(serial) if serial else 0
941  # Only check when both given_amount and required_amount are non zero.
942  if all((given_amount, required_amount)):
943    # Base on TF rules, given_amount can be greater than or equal to
944    # required_amount.
945    if required_amount > given_amount:
946      atest_utils.colorful_print(
947          f'The test requires {required_amount} devices, '
948          f'but {given_amount} were given.',
949          constants.RED,
950      )
951      return False
952  return True
953
954
955def setup_metrics_tool_name(no_metrics: bool = False):
956  """Setup tool_name and sub_tool_name for MetricsBase."""
957  if (
958      not no_metrics
959      and metrics_base.MetricsBase.user_type == metrics_base.INTERNAL_USER
960  ):
961    metrics_utils.print_data_collection_notice()
962
963    USER_FROM_TOOL = os.getenv(constants.USER_FROM_TOOL)
964    metrics_base.MetricsBase.tool_name = (
965        USER_FROM_TOOL if USER_FROM_TOOL else constants.TOOL_NAME
966    )
967
968    USER_FROM_SUB_TOOL = os.getenv(constants.USER_FROM_SUB_TOOL)
969    metrics_base.MetricsBase.sub_tool_name = (
970        USER_FROM_SUB_TOOL if USER_FROM_SUB_TOOL else constants.SUB_TOOL_NAME
971    )
972
973
974# pylint: disable=too-many-statements
975# pylint: disable=too-many-branches
976# pylint: disable=too-many-return-statements
977def _main(
978    argv: List[Any],
979    results_dir: str,
980    args: argparse.Namespace,
981    banner_printer: banner.BannerPrinter,
982):
983  """Entry point of atest script.
984
985  Args:
986      argv: A list of arguments.
987      results_dir: A directory which stores the ATest execution information.
988      args: An argparse.Namespace class instance holding parsed args.
989      banner_printer: A BannerPrinter object used to collect banners and print
990        banners at the end of this invocation.
991
992  Returns:
993      Exit code.
994  """
995  _begin_time = time.time()
996  logging.debug(
997      'Running atest script with argv %s, results_dir %s, args %s.',
998      argv,
999      results_dir,
1000      args,
1001  )
1002
1003  # Sets coverage environment variables.
1004  if args.experimental_coverage:
1005    atest_utils.update_build_env(coverage.build_env_vars())
1006  set_build_output_mode(args.build_output)
1007
1008  _validate_args(args)
1009  metrics_utils.get_start_time()
1010  _send_start_event(argv, args.tests)
1011  _non_action_validator(args)
1012
1013  proc_acloud, report_file = _get_acloud_proc_and_log(args, results_dir)
1014  is_clean = not os.path.exists(
1015      os.environ.get(constants.ANDROID_PRODUCT_OUT, '')
1016  )
1017
1018  # Run Test Mapping or coverage by no-bazel-mode.
1019  if atest_utils.is_test_mapping(args) or args.experimental_coverage:
1020    logging.debug('Running test mapping or coverage, disabling bazel mode.')
1021    atest_utils.colorful_print(
1022        'Not running using bazel-mode.', constants.YELLOW
1023    )
1024    args.bazel_mode = False
1025
1026  proc_idx = atest_utils.start_threading(lambda: print)
1027  # Do not index targets while the users intend to dry-run tests.
1028  if need_run_index_targets(args):
1029    logging.debug('Starting to index targets in a background thread.')
1030    proc_idx = atest_utils.start_threading(
1031        indexing.index_targets,
1032        daemon=True,
1033    )
1034  smart_rebuild = need_rebuild_module_info(args)
1035  logging.debug('need_rebuild_module_info returned %s', smart_rebuild)
1036
1037  mod_info = module_info.load(
1038      force_build=smart_rebuild,
1039      sqlite_module_cache=args.sqlite_module_cache,
1040  )
1041  logging.debug('Obtained module info object: %s', mod_info)
1042
1043  translator = cli_translator.CLITranslator(
1044      mod_info=mod_info,
1045      print_cache_msg=not args.clear_cache,
1046      bazel_mode_enabled=args.bazel_mode,
1047      host=args.host,
1048      bazel_mode_features=args.bazel_mode_features,
1049  )
1050  if args.list_modules:
1051    _print_testable_modules(mod_info, args.list_modules)
1052    return ExitCode.SUCCESS
1053  test_infos = set()
1054  # (b/242567487) index_targets may finish after cli_translator; to
1055  # mitigate the overhead, the main waits until it finished when no index
1056  # files are available (e.g. fresh repo sync)
1057  if proc_idx.is_alive() and not indexing.Indices().has_all_indices():
1058    proc_idx.join()
1059  find_start = time.time()
1060  test_infos = translator.translate(args)
1061
1062  # Only check for sufficient devices if not dry run.
1063  args.device_count_config = get_device_count_config(test_infos, mod_info)
1064  if not args.dry_run and not has_set_sufficient_devices(
1065      args.device_count_config, args.serial
1066  ):
1067    return ExitCode.INSUFFICIENT_DEVICES
1068
1069  find_duration = time.time() - find_start
1070  if not test_infos:
1071    return ExitCode.TEST_NOT_FOUND
1072
1073  test_execution_plan = _create_test_execution_plan(
1074      test_infos=test_infos,
1075      results_dir=results_dir,
1076      mod_info=mod_info,
1077      args=args,
1078      dry_run=args.dry_run,
1079  )
1080
1081  extra_args = test_execution_plan.extra_args
1082
1083  build_targets = test_execution_plan.required_build_targets()
1084
1085  # Remove MODULE-IN-* from build targets by default.
1086  if not args.use_modules_in:
1087    build_targets = _exclude_modules_in_targets(build_targets)
1088
1089  if args.dry_run:
1090    return _dry_run(results_dir, extra_args, test_infos, mod_info)
1091
1092  steps = parse_steps(args)
1093  device_update_method = _configure_update_method(
1094      steps=steps,
1095      plan=test_execution_plan,
1096      update_modules=set(args.update_modules or []),
1097      banner_printer=banner_printer,
1098  )
1099
1100  if build_targets and steps.has_build():
1101    if args.experimental_coverage:
1102      build_targets.update(coverage.build_modules())
1103
1104    # Add module-info.json target to the list of build targets to keep the
1105    # file up to date.
1106    build_targets.add(module_info.get_module_info_target())
1107
1108    build_targets |= device_update_method.dependencies()
1109
1110    # Add the -jx as a build target if user specify it.
1111    if args.build_j:
1112      build_targets.add(f'-j{args.build_j}')
1113
1114    build_start = time.time()
1115    success = atest_utils.build(build_targets)
1116    build_duration = time.time() - build_start
1117    metrics.BuildFinishEvent(
1118        duration=metrics_utils.convert_duration(build_duration),
1119        success=success,
1120        targets=build_targets,
1121    )
1122    metrics.LocalDetectEvent(
1123        detect_type=DetectType.BUILD_TIME_PER_TARGET,
1124        result=int(build_duration / len(build_targets)),
1125    )
1126    rebuild_module_info = DetectType.NOT_REBUILD_MODULE_INFO
1127    if is_clean:
1128      rebuild_module_info = DetectType.CLEAN_BUILD
1129    elif args.rebuild_module_info:
1130      rebuild_module_info = DetectType.REBUILD_MODULE_INFO
1131    elif smart_rebuild:
1132      rebuild_module_info = DetectType.SMART_REBUILD_MODULE_INFO
1133    metrics.LocalDetectEvent(
1134        detect_type=rebuild_module_info, result=int(build_duration)
1135    )
1136    if not success:
1137      return ExitCode.BUILD_FAILURE
1138    if proc_acloud:
1139      proc_acloud.join()
1140      status = avd.probe_acloud_status(
1141          report_file, find_duration + build_duration
1142      )
1143      if status != 0:
1144        return status
1145    # After build step 'adb' command will be available, and stop forward to
1146    # Tradefed if the tests require a device.
1147    _validate_adb_devices(args, test_infos)
1148
1149  device_update_method.update(extra_args.get(constants.SERIAL, []))
1150
1151  tests_exit_code = ExitCode.SUCCESS
1152  test_start = time.time()
1153  if steps.has_test():
1154    # Only send duration to metrics when no --build.
1155    if not steps.has_build():
1156      _init_and_find = time.time() - _begin_time
1157      logging.debug('Initiation and finding tests took %ss', _init_and_find)
1158      metrics.LocalDetectEvent(
1159          detect_type=DetectType.INIT_AND_FIND_MS,
1160          result=int(_init_and_find * 1000),
1161      )
1162
1163    tests_exit_code = test_execution_plan.execute()
1164
1165    if args.experimental_coverage:
1166      coverage.generate_coverage_report(
1167          results_dir,
1168          test_infos,
1169          mod_info,
1170          extra_args.get(constants.HOST, False),
1171          args.code_under_test,
1172      )
1173
1174  metrics.RunTestsFinishEvent(
1175      duration=metrics_utils.convert_duration(time.time() - test_start)
1176  )
1177  preparation_time = atest_execution_info.preparation_time(test_start)
1178  if preparation_time:
1179    # Send the preparation time only if it's set.
1180    metrics.RunnerFinishEvent(
1181        duration=metrics_utils.convert_duration(preparation_time),
1182        success=True,
1183        runner_name=constants.TF_PREPARATION,
1184        test=[],
1185    )
1186  if tests_exit_code != ExitCode.SUCCESS:
1187    tests_exit_code = ExitCode.TEST_FAILURE
1188
1189  return tests_exit_code
1190
1191
1192def _configure_update_method(
1193    *,
1194    steps: Steps,
1195    plan: TestExecutionPlan,
1196    update_modules: set[str],
1197    banner_printer: banner.BannerPrinter,
1198) -> device_update.DeviceUpdateMethod:
1199
1200  requires_device_update = plan.requires_device_update()
1201
1202  if not steps.has_device_update():
1203    if requires_device_update:
1204      banner_printer.register(
1205          'Tips: If your test requires device update, consider '
1206          'http://go/atest-single-command to simplify your workflow!'
1207      )
1208    return device_update.NoopUpdateMethod()
1209
1210  if not requires_device_update:
1211    atest_utils.colorful_print(
1212        '\nWarning: Device update ignored because it is not required by '
1213        'tests in this invocation.',
1214        constants.YELLOW,
1215    )
1216    return device_update.NoopUpdateMethod()
1217
1218  return device_update.AdeviceUpdateMethod(targets=update_modules)
1219
1220
1221def _create_test_execution_plan(
1222    *,
1223    test_infos: List[test_info.TestInfo],
1224    results_dir: str,
1225    mod_info: module_info.ModuleInfo,
1226    args: argparse.Namespace,
1227    dry_run: bool,
1228) -> TestExecutionPlan:
1229  """Creates a plan to execute the tests.
1230
1231  Args:
1232      test_infos: A list of instances of TestInfo.
1233      results_dir: A directory which stores the ATest execution information.
1234      mod_info: An instance of ModuleInfo.
1235      args: An argparse.Namespace instance holding parsed args.
1236      dry_run: A boolean of whether this invocation is a dry run.
1237
1238  Returns:
1239      An instance of TestExecutionPlan.
1240  """
1241
1242  if is_from_test_mapping(test_infos):
1243    return TestMappingExecutionPlan.create(
1244        test_infos=test_infos,
1245        results_dir=results_dir,
1246        mod_info=mod_info,
1247        args=args,
1248    )
1249
1250  return TestModuleExecutionPlan.create(
1251      test_infos=test_infos,
1252      results_dir=results_dir,
1253      mod_info=mod_info,
1254      args=args,
1255      dry_run=dry_run,
1256  )
1257
1258
1259class TestExecutionPlan(ABC):
1260  """Represents how an Atest invocation's tests will execute."""
1261
1262  def __init__(
1263      self,
1264      *,
1265      extra_args: Dict[str, Any],
1266  ):
1267    self._extra_args = extra_args
1268
1269  @property
1270  def extra_args(self) -> Dict[str, Any]:
1271    return self._extra_args
1272
1273  @abstractmethod
1274  def execute(self) -> ExitCode:
1275    """Executes all test runner invocations in this plan."""
1276
1277  @abstractmethod
1278  def required_build_targets(self) -> Set[str]:
1279    """Returns the list of build targets required by this plan."""
1280
1281  @abstractmethod
1282  def requires_device_update(self) -> bool:
1283    """Checks whether this plan requires device update."""
1284
1285
1286class TestMappingExecutionPlan(TestExecutionPlan):
1287  """A plan to execute Test Mapping tests."""
1288
1289  def __init__(
1290      self,
1291      *,
1292      test_type_to_invocations: Dict[str, List[TestRunnerInvocation]],
1293      extra_args: Dict[str, Any],
1294  ):
1295    super().__init__(extra_args=extra_args)
1296    self._test_type_to_invocations = test_type_to_invocations
1297
1298  @staticmethod
1299  def create(
1300      *,
1301      test_infos: List[test_info.TestInfo],
1302      results_dir: str,
1303      mod_info: module_info.ModuleInfo,
1304      args: argparse.Namespace,
1305  ) -> TestMappingExecutionPlan:
1306    """Creates an instance of TestMappingExecutionPlan.
1307
1308    Args:
1309        test_infos: A list of instances of TestInfo.
1310        results_dir: A directory which stores the ATest execution information.
1311        mod_info: An instance of ModuleInfo.
1312        args: An argparse.Namespace instance holding parsed args.
1313
1314    Returns:
1315        An instance of TestMappingExecutionPlan.
1316    """
1317
1318    device_test_infos, host_test_infos = _split_test_mapping_tests(test_infos)
1319    _validate_tm_tests_exec_mode(args, device_test_infos, host_test_infos)
1320    extra_args = get_extra_args(args)
1321
1322    # TODO: change to another approach that put constants.CUSTOM_ARGS in the
1323    # end of command to make sure that customized args can override default
1324    # options.
1325    # For TEST_MAPPING, set timeout to 600000ms.
1326    custom_timeout = False
1327    for custom_args in args.custom_args:
1328      if '-timeout' in custom_args:
1329        custom_timeout = True
1330
1331    if args.test_timeout is None and not custom_timeout:
1332      extra_args.update({constants.TEST_TIMEOUT: 600000})
1333      logging.debug(
1334          'Set test timeout to %sms to align it in TEST_MAPPING.',
1335          extra_args.get(constants.TEST_TIMEOUT),
1336      )
1337
1338    def create_invocations(runner_extra_args, runner_test_infos):
1339      return test_runner_handler.create_test_runner_invocations(
1340          test_infos=runner_test_infos,
1341          results_dir=results_dir,
1342          mod_info=mod_info,
1343          extra_args=runner_extra_args,
1344          minimal_build=args.minimal_build,
1345      )
1346
1347    test_type_to_invocations = collections.OrderedDict()
1348    if extra_args.get(constants.DEVICE_ONLY):
1349      atest_utils.colorful_print(
1350          'Option `--device-only` specified. Skip running deviceless tests.',
1351          constants.MAGENTA,
1352      )
1353    else:
1354      # `host` option needs to be set to True to run host side tests.
1355      host_extra_args = extra_args.copy()
1356      host_extra_args[constants.HOST] = True
1357      test_type_to_invocations.setdefault(HOST_TESTS, []).extend(
1358          create_invocations(host_extra_args, host_test_infos)
1359      )
1360
1361    if extra_args.get(constants.HOST):
1362      atest_utils.colorful_print(
1363          'Option `--host` specified. Skip running device tests.',
1364          constants.MAGENTA,
1365      )
1366    else:
1367      test_type_to_invocations.setdefault(DEVICE_TESTS, []).extend(
1368          create_invocations(extra_args, device_test_infos)
1369      )
1370
1371    return TestMappingExecutionPlan(
1372        test_type_to_invocations=test_type_to_invocations,
1373        extra_args=extra_args,
1374    )
1375
1376  def requires_device_update(self) -> bool:
1377    return _requires_device_update(
1378        [i for invs in self._test_type_to_invocations.values() for i in invs]
1379    )
1380
1381  def required_build_targets(self) -> Set[str]:
1382    build_targets = set()
1383    for invocation in itertools.chain.from_iterable(
1384        self._test_type_to_invocations.values()
1385    ):
1386      build_targets |= invocation.get_test_runner_reqs()
1387
1388    return build_targets
1389
1390  def execute(self) -> ExitCode:
1391    return _run_test_mapping_tests(
1392        self._test_type_to_invocations, self.extra_args
1393    )
1394
1395
1396class TestModuleExecutionPlan(TestExecutionPlan):
1397  """A plan to execute the test modules explicitly passed on the command-line."""
1398
1399  def __init__(
1400      self,
1401      *,
1402      test_runner_invocations: List[TestRunnerInvocation],
1403      extra_args: Dict[str, Any],
1404  ):
1405    super().__init__(extra_args=extra_args)
1406    self._test_runner_invocations = test_runner_invocations
1407
1408  @staticmethod
1409  def create(
1410      *,
1411      test_infos: List[test_info.TestInfo],
1412      results_dir: str,
1413      mod_info: module_info.ModuleInfo,
1414      args: argparse.Namespace,
1415      dry_run: bool,
1416  ) -> TestModuleExecutionPlan:
1417    """Creates an instance of TestModuleExecutionPlan.
1418
1419    Args:
1420        test_infos: A list of instances of TestInfo.
1421        results_dir: A directory which stores the ATest execution information.
1422        mod_info: An instance of ModuleInfo.
1423        args: An argparse.Namespace instance holding parsed args.
1424        dry_run: A boolean of whether this invocation is a dry run.
1425
1426    Returns:
1427        An instance of TestModuleExecutionPlan.
1428    """
1429
1430    if not dry_run:
1431      _validate_exec_mode(args, test_infos)
1432
1433    # _validate_exec_mode appends --host automatically when pure
1434    # host-side tests, so re-parsing extra_args is a must.
1435    extra_args = get_extra_args(args)
1436
1437    invocations = test_runner_handler.create_test_runner_invocations(
1438        test_infos=test_infos,
1439        results_dir=results_dir,
1440        mod_info=mod_info,
1441        extra_args=extra_args,
1442        minimal_build=args.minimal_build,
1443    )
1444
1445    return TestModuleExecutionPlan(
1446        test_runner_invocations=invocations,
1447        extra_args=extra_args,
1448    )
1449
1450  def requires_device_update(self) -> bool:
1451    return _requires_device_update(self._test_runner_invocations)
1452
1453  def required_build_targets(self) -> Set[str]:
1454    build_targets = set()
1455    for test_runner_invocation in self._test_runner_invocations:
1456      build_targets |= test_runner_invocation.get_test_runner_reqs()
1457
1458    return build_targets
1459
1460  def execute(self) -> ExitCode:
1461
1462    reporter = result_reporter.ResultReporter(
1463        collect_only=self.extra_args.get(constants.COLLECT_TESTS_ONLY),
1464        wait_for_debugger=atest_configs.GLOBAL_ARGS.wait_for_debugger,
1465    )
1466    reporter.print_starting_text()
1467
1468    exit_code = ExitCode.SUCCESS
1469    for invocation in self._test_runner_invocations:
1470      exit_code |= invocation.run_all_tests(reporter)
1471
1472    atest_execution_info.AtestExecutionInfo.result_reporters.append(reporter)
1473    return reporter.print_summary() | exit_code
1474
1475
1476def _requires_device_update(invocations: List[TestRunnerInvocation]) -> bool:
1477  """Checks if any invocation requires device update."""
1478  return any(i.requires_device_update() for i in invocations)
1479
1480
1481if __name__ == '__main__':
1482  results_dir = make_test_run_dir()
1483  if END_OF_OPTION in sys.argv:
1484    end_position = sys.argv.index(END_OF_OPTION)
1485    final_args = [
1486        *sys.argv[1:end_position],
1487        *_get_args_from_config(),
1488        *sys.argv[end_position:],
1489    ]
1490  else:
1491    final_args = [*sys.argv[1:], *_get_args_from_config()]
1492  if final_args != sys.argv[1:]:
1493    print(
1494        'The actual cmd will be: \n\t{}\n'.format(
1495            atest_utils.mark_cyan('atest ' + ' '.join(final_args))
1496        )
1497    )
1498    metrics.LocalDetectEvent(detect_type=DetectType.ATEST_CONFIG, result=1)
1499    if HAS_IGNORED_ARGS:
1500      atest_utils.colorful_print(
1501          'Please correct the config and try again.', constants.YELLOW
1502      )
1503      sys.exit(ExitCode.EXIT_BEFORE_MAIN)
1504  else:
1505    metrics.LocalDetectEvent(detect_type=DetectType.ATEST_CONFIG, result=0)
1506
1507  args = _parse_args(final_args)
1508  atest_configs.GLOBAL_ARGS = args
1509  _configure_logging(args.verbose, results_dir)
1510
1511  logging.debug(
1512      'Start of atest run. sys.argv: %s, final_args: %s', sys.argv, final_args
1513  )
1514
1515  banner_printer = banner.BannerPrinter.create()
1516
1517  with atest_execution_info.AtestExecutionInfo(
1518      final_args, results_dir, atest_configs.GLOBAL_ARGS
1519  ) as result_file:
1520    setup_metrics_tool_name(atest_configs.GLOBAL_ARGS.no_metrics)
1521
1522    exit_code = _main(
1523        final_args,
1524        results_dir,
1525        atest_configs.GLOBAL_ARGS,
1526        banner_printer,
1527    )
1528    detector = bug_detector.BugDetector(final_args, exit_code)
1529    if exit_code not in EXIT_CODES_BEFORE_TEST:
1530      metrics.LocalDetectEvent(
1531          detect_type=DetectType.BUG_DETECTED, result=detector.caught_result
1532      )
1533      if result_file:
1534        print("Run 'atest --history' to review test result history.")
1535
1536  banner_printer.print()
1537
1538  sys.exit(exit_code)
1539