1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Module Finder class."""
16
17import logging
18import os
19import time
20from typing import List
21
22from atest import atest_configs
23from atest import atest_error
24from atest import atest_utils
25from atest import constants
26from atest.atest_enum import DetectType
27from atest.metrics import metrics
28from atest.test_finders import test_filter_utils
29from atest.test_finders import test_finder_base
30from atest.test_finders import test_finder_utils
31from atest.test_finders import test_info
32from atest.test_runners import atest_tf_test_runner
33from atest.test_runners import mobly_test_runner
34from atest.test_runners import robolectric_test_runner
35from atest.test_runners import vts_tf_test_runner
36
37# These are suites in LOCAL_COMPATIBILITY_SUITE that aren't really suites so
38# we can ignore them.
39_SUITES_TO_IGNORE = frozenset({'general-tests', 'device-tests', 'tests'})
40
41
42class ModuleFinder(test_finder_base.TestFinderBase):
43  """Module finder class."""
44
45  NAME = 'MODULE'
46  _TEST_RUNNER = atest_tf_test_runner.AtestTradefedTestRunner.NAME
47  _MOBLY_RUNNER = mobly_test_runner.MoblyTestRunner.NAME
48  _ROBOLECTRIC_RUNNER = robolectric_test_runner.RobolectricTestRunner.NAME
49  _VTS_TEST_RUNNER = vts_tf_test_runner.VtsTradefedTestRunner.NAME
50
51  def __init__(self, module_info=None):
52    super().__init__()
53    self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
54    self.module_info = module_info
55
56  def _determine_modules_to_test(
57      self, path: str, file_path: str = None
58  ) -> List:
59    """Determine which module the user is trying to test.
60
61    Returns the modules to test. If there are multiple possibilities, will
62    ask the user. Otherwise will return the only module found.
63
64    Args:
65        path: String path of module to look for.
66        file_path: String path of input file.
67
68    Returns:
69        A list of the module names.
70    """
71    modules_to_test = set()
72
73    if file_path:
74      modules_to_test = self.module_info.get_modules_by_path_in_srcs(
75          path=file_path,
76          testable_modules_only=True,
77      )
78
79    modules_to_test |= self.module_info.get_modules_by_path(
80        path=path,
81        testable_modules_only=True,
82    )
83
84    return test_finder_utils.extract_selected_tests(modules_to_test)
85
86  def _is_vts_module(self, module_name):
87    """Returns True if the module is a vts10 module, else False."""
88    mod_info = self.module_info.get_module_info(module_name)
89    suites = []
90    if mod_info:
91      suites = mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, [])
92    # Pull out all *ts (cts, tvts, etc) suites.
93    suites = [suite for suite in suites if suite not in _SUITES_TO_IGNORE]
94    return len(suites) == 1 and 'vts10' in suites
95
96  def _update_to_vts_test_info(self, test):
97    """Fill in the fields with vts10 specific info.
98
99    We need to update the runner to use the vts10 runner and also find the
100    test specific dependencies.
101
102    Args:
103        test: TestInfo to update with vts10 specific details.
104
105    Return:
106        TestInfo that is ready for the vts10 test runner.
107    """
108    test.test_runner = self._VTS_TEST_RUNNER
109    config_file = os.path.join(
110        self.root_dir, test.data[constants.TI_REL_CONFIG]
111    )
112    # Need to get out dir (special logic is to account for custom out dirs).
113    # The out dir is used to construct the build targets for the test deps.
114    out_dir = os.environ.get(constants.ANDROID_HOST_OUT)
115    custom_out_dir = os.environ.get(constants.ANDROID_OUT_DIR)
116    # If we're not an absolute custom out dir, get no-absolute out dir path.
117    if custom_out_dir is None or not os.path.isabs(custom_out_dir):
118      out_dir = os.path.relpath(out_dir, self.root_dir)
119    vts_out_dir = os.path.join(out_dir, 'vts10', 'android-vts10', 'testcases')
120    # Parse dependency of default staging plans.
121    xml_paths = test_finder_utils.search_integration_dirs(
122        constants.VTS_STAGING_PLAN,
123        self.module_info.get_paths(constants.VTS_TF_MODULE),
124    )
125    vts_xmls = set()
126    vts_xmls.add(config_file)
127    for xml_path in xml_paths:
128      vts_xmls |= test_finder_utils.get_plans_from_vts_xml(xml_path)
129    for config_file in vts_xmls:
130      # Add in vts10 test build targets.
131      for target in test_finder_utils.get_targets_from_vts_xml(
132          config_file, vts_out_dir, self.module_info
133      ):
134        test.add_build_target(target)
135    test.add_build_target('vts-test-core')
136    test.add_build_target(test.test_name)
137    return test
138
139  def _update_to_mobly_test_info(self, test):
140    """Update the fields for a Mobly test.
141
142    The runner will be updated to the Mobly runner.
143
144    The module's build output paths will be stored in the test_info data.
145
146    Args:
147        test: TestInfo to be updated with Mobly fields.
148
149    Returns:
150        TestInfo with updated Mobly fields.
151    """
152    # Set test runner to MoblyTestRunner
153    test.test_runner = self._MOBLY_RUNNER
154    # Add test module as build target
155    module_name = test.test_name
156    test.add_build_target(module_name)
157    # Add module's installed paths to data, so the runner may access the
158    # module's build outputs.
159    installed_paths = self.module_info.get_installed_paths(module_name)
160    test.data[constants.MODULE_INSTALLED] = installed_paths
161    return test
162
163  def _update_legacy_robolectric_test_info(self, test):
164    """Update the fields for a legacy robolectric test.
165
166    This method is updating test_name when the given is a legacy robolectric
167    test, and assigning Robolectric Runner for it.
168
169    e.g. WallPaperPicker2RoboTests is a legacy robotest, and the test_name
170    will become RunWallPaperPicker2RoboTests and run it with Robolectric
171    Runner.
172
173    Args:
174        test: TestInfo to be updated with robolectric fields.
175
176    Returns:
177        TestInfo with updated robolectric fields.
178    """
179    test.test_runner = self._ROBOLECTRIC_RUNNER
180    test.test_name = self.module_info.get_robolectric_test_name(
181        self.module_info.get_module_info(test.test_name)
182    )
183    return test
184
185  # pylint: disable=too-many-branches
186  def _process_test_info(self, test):
187    """Process the test info and return some fields updated/changed.
188
189    We need to check if the test found is a special module (like vts10) and
190    update the test_info fields (like test_runner) appropriately.
191
192    Args:
193        test: TestInfo that has been filled out by a find method.
194
195    Return:
196        TestInfo that has been modified as needed and return None if
197        this module can't be found in the module_info.
198    """
199    module_name = test.test_name
200    mod_info = self.module_info.get_module_info(module_name)
201    if not mod_info:
202      return None
203    test.module_class = mod_info['class']
204    test.install_locations = test_finder_utils.get_install_locations(
205        mod_info.get(constants.MODULE_INSTALLED, [])
206    )
207    # Check if this is only a vts10 module.
208    if self._is_vts_module(test.test_name):
209      return self._update_to_vts_test_info(test)
210    # Check if this is a Mobly test module.
211    if self.module_info.is_mobly_module(mod_info):
212      return self._update_to_mobly_test_info(test)
213    test.robo_type = self.module_info.get_robolectric_type(test.test_name)
214    if test.robo_type:
215      test.install_locations = {constants.DEVICELESS_TEST}
216      if test.robo_type == constants.ROBOTYPE_MODERN:
217        test.add_build_target(test.test_name)
218        return test
219      if test.robo_type == constants.ROBOTYPE_LEGACY:
220        return self._update_legacy_robolectric_test_info(test)
221    rel_config = test.data[constants.TI_REL_CONFIG]
222    for target in self._get_build_targets(module_name, rel_config):
223      test.add_build_target(target)
224    # (b/177626045) Probe target APK for running instrumentation tests to
225    # prevent RUNNER ERROR by adding target application(module) to the
226    # build_targets, and install these target apks before testing.
227    artifact_map = self.module_info.get_instrumentation_target_apps(module_name)
228    if artifact_map:
229      logging.debug('Found %s an instrumentation test.', module_name)
230      for art in artifact_map.keys():
231        test.add_build_target(art)
232      logging.debug(
233          'Add %s to build targets...', ', '.join(artifact_map.keys())
234      )
235      test.artifacts = [apk for p in artifact_map.values() for apk in p]
236      logging.debug('Will install target APK: %s\n', test.artifacts)
237      metrics.LocalDetectEvent(
238          detect_type=DetectType.FOUND_TARGET_ARTIFACTS,
239          result=len(test.artifacts),
240      )
241    # For device side java test, it will use
242    # com.android.compatibility.testtype.DalvikTest as test runner in
243    # cts-dalvik-device-test-runner.jar
244    if self.module_info.is_auto_gen_test_config(module_name):
245      if constants.MODULE_CLASS_JAVA_LIBRARIES in test.module_class:
246        for dalvik_dep in test_finder_utils.DALVIK_TEST_DEPS:
247          if self.module_info.is_module(dalvik_dep):
248            test.add_build_target(dalvik_dep)
249    # Update test name if the test belong to extra config which means it's
250    # test config name is not the same as module name. For extra config, it
251    # index will be greater or equal to 1.
252    try:
253      if mod_info.get(constants.MODULE_TEST_CONFIG, []).index(rel_config) > 0:
254        config_test_name = os.path.splitext(os.path.basename(rel_config))[0]
255        logging.debug(
256            'Replace test_info.name(%s) to %s', test.test_name, config_test_name
257        )
258        test.test_name = config_test_name
259    except ValueError:
260      pass
261    return test
262
263  def _get_build_targets(self, module_name, rel_config):
264    """Get the test deps.
265
266    Args:
267        module_name: name of the test.
268        rel_config: XML for the given test.
269
270    Returns:
271        Set of build targets.
272    """
273    targets = set()
274    if not self.module_info.is_auto_gen_test_config(module_name):
275      config_file = os.path.join(self.root_dir, rel_config)
276      targets = test_finder_utils.get_targets_from_xml(
277          config_file, self.module_info
278      )
279    if constants.VTS_CORE_SUITE in self.module_info.get_module_info(
280        module_name
281    ).get(constants.MODULE_COMPATIBILITY_SUITES, []):
282      targets.add(constants.VTS_CORE_TF_MODULE)
283    for suite in self.module_info.get_module_info(module_name).get(
284        constants.MODULE_COMPATIBILITY_SUITES, []
285    ):
286      targets.update(constants.SUITE_DEPS.get(suite, []))
287    for module_path in self.module_info.get_paths(module_name):
288      mod_dir = module_path.replace('/', '-')
289      targets.add(constants.MODULES_IN + mod_dir)
290    # (b/156457698) Force add vts_kernel_ltp_tests as build target if our
291    # test belongs to REQUIRED_LTP_TEST_MODULES due to required_module
292    # option not working for sh_test in soong.
293    if module_name in constants.REQUIRED_LTP_TEST_MODULES:
294      targets.add('vts_kernel_ltp_tests')
295    # (b/184567849) Force adding module_name as a build_target. This will
296    # allow excluding MODULES-IN-* and prevent from missing build targets.
297    if module_name and self.module_info.is_module(module_name):
298      targets.add(module_name)
299    # If it's a MTS test, add cts-tradefed as test dependency.
300    if constants.MTS_SUITE in self.module_info.get_module_info(module_name).get(
301        constants.MODULE_COMPATIBILITY_SUITES, []
302    ):
303      if self.module_info.is_module(constants.CTS_JAR):
304        targets.add(constants.CTS_JAR)
305    return targets
306
307  def _get_module_test_config(self, module_name, rel_config=None) -> list[str]:
308    """Get the value of test_config in module_info.
309
310    Get the value of 'test_config' in module_info if its
311    auto_test_config is not true.
312    In this case, the test_config is specified by user.
313    If not, return rel_config.
314
315    Args:
316        module_name: A string of the test's module name.
317        rel_config: XML for the given test.
318
319    Returns:
320        A list of string of test_config path if found, else return rel_config.
321    """
322    default_all_config = not (
323        atest_configs.GLOBAL_ARGS
324        and atest_configs.GLOBAL_ARGS.test_config_select
325    )
326    mod_info = self.module_info.get_module_info(module_name)
327    if mod_info:
328      test_configs = []
329      test_config_list = mod_info.get(constants.MODULE_TEST_CONFIG, [])
330      if test_config_list:
331        # multiple test configs
332        if len(test_config_list) > 1:
333          test_configs = test_finder_utils.extract_selected_tests(
334              test_config_list, default_all=default_all_config
335          )
336        else:
337          test_configs = test_config_list
338      if test_configs:
339        return test_configs
340      # Double check if below section is needed.
341      if (
342          not self.module_info.is_auto_gen_test_config(module_name)
343          and len(test_configs) > 0
344      ):
345        return test_configs
346    return [rel_config] if rel_config else []
347
348  # pylint: disable=too-many-branches
349  # pylint: disable=too-many-locals
350  def _get_test_info_filter(self, path, methods, **kwargs):
351    """Get test info filter.
352
353    Args:
354        path: A string of the test's path.
355        methods: A set of method name strings.
356        rel_module_dir: Optional. A string of the module dir no-absolute to
357          root.
358        class_name: Optional. A string of the class name.
359        is_native_test: Optional. A boolean variable of whether to search for a
360          native test or not.
361
362    Returns:
363        A set of test info filter.
364    """
365    _, file_name = test_finder_utils.get_dir_path_and_filename(path)
366    ti_filter = frozenset()
367    if os.path.isfile(path) and kwargs.get('is_native_test', None):
368      class_info = test_finder_utils.get_cc_class_info(path)
369      ti_filter = frozenset([
370          test_info.TestFilter(
371              test_filter_utils.get_cc_filter(
372                  class_info, kwargs.get('class_name', '*'), methods
373              ),
374              frozenset(),
375          )
376      ])
377    # Path to java file.
378    elif file_name and constants.JAVA_EXT_RE.match(file_name):
379      full_class_name = test_filter_utils.get_fully_qualified_class_name(path)
380      methods = frozenset(
381          test_filter_utils.get_java_method_filters(path, methods)
382      )
383      ti_filter = frozenset([test_info.TestFilter(full_class_name, methods)])
384    # Path to cc file.
385    elif file_name and constants.CC_EXT_RE.match(file_name):
386      # TODO: b/173019813 - Should setup correct filter for an input file.
387      if not test_finder_utils.has_cc_class(path):
388        raise atest_error.MissingCCTestCaseError(
389            "Can't find CC class in %s" % path
390        )
391      class_info = test_finder_utils.get_cc_class_info(path)
392      cc_filters = []
393      for classname, _ in class_info.items():
394        cc_filters.append(
395            test_info.TestFilter(
396                test_filter_utils.get_cc_filter(class_info, classname, methods),
397                frozenset(),
398            )
399        )
400      ti_filter = frozenset(cc_filters)
401    # If input path is a folder and have class_name information.
402    elif not file_name and kwargs.get('class_name', None):
403      ti_filter = frozenset(
404          [test_info.TestFilter(kwargs.get('class_name', None), methods)]
405      )
406    # Path to non-module dir, treat as package.
407    elif not file_name and kwargs.get(
408        'rel_module_dir', None
409    ) != os.path.relpath(path, self.root_dir):
410      dir_items = [os.path.join(path, f) for f in os.listdir(path)]
411      for dir_item in dir_items:
412        if constants.JAVA_EXT_RE.match(dir_item):
413          package_name = test_filter_utils.get_package_name(dir_item)
414          if package_name:
415            # methods should be empty frozenset for package.
416            if methods:
417              raise atest_error.MethodWithoutClassError(
418                  '%s: Method filtering requires class' % str(methods)
419              )
420            ti_filter = frozenset([test_info.TestFilter(package_name, methods)])
421            break
422    logging.debug('_get_test_info_filter() ti_filter: %s', ti_filter)
423    return ti_filter
424
425  def _get_rel_config(self, test_path):
426    """Get config file's no-absolute path.
427
428    Args:
429        test_path: A string of the test absolute path.
430
431    Returns:
432        A string of config's no-absolute path, else None.
433    """
434    test_dir = os.path.dirname(test_path)
435    rel_module_dir = test_finder_utils.find_parent_module_dir(
436        self.root_dir, test_dir, self.module_info
437    )
438    if rel_module_dir:
439      return os.path.join(rel_module_dir, constants.MODULE_CONFIG)
440    return None
441
442  def _get_test_infos(self, test_path, rel_config, module_name, test_filter):
443    """Get test_info for test_path.
444
445    Args:
446        test_path: A string of the test path.
447        rel_config: A string of rel path of config.
448        module_name: A string of the module name to use.
449        test_filter: A test info filter.
450
451    Returns:
452        A list of TestInfo namedtuple if found, else None.
453    """
454    if not rel_config:
455      rel_config = self._get_rel_config(test_path)
456      if not rel_config:
457        return None
458    if module_name:
459      module_names = [module_name]
460    else:
461      module_names = self._determine_modules_to_test(
462          os.path.dirname(rel_config),
463          test_path if self._is_comparted_src(test_path) else None,
464      )
465    test_infos = []
466    if module_names:
467      for mname in module_names:
468        # The real test config might be record in module-info.
469        mod_info = self.module_info.get_module_info(mname)
470        if not mod_info:
471          continue
472        rel_configs = self._get_module_test_config(mname, rel_config=rel_config)
473        for rel_cfg in rel_configs:
474          tinfo = self._process_test_info(
475              test_info.TestInfo(
476                  test_name=mname,
477                  test_runner=self._TEST_RUNNER,
478                  build_targets=set(),
479                  data={
480                      constants.TI_FILTER: test_filter,
481                      constants.TI_REL_CONFIG: rel_cfg,
482                  },
483                  compatibility_suites=mod_info.get(
484                      constants.MODULE_COMPATIBILITY_SUITES, []
485                  ),
486              )
487          )
488          if tinfo:
489            test_infos.append(tinfo)
490    return test_infos
491
492  def find_test_by_module_name(self, module_name):
493    """Find test for the given module name.
494
495    Args:
496        module_name: A string of the test's module name.
497
498    Returns:
499        A list that includes only 1 populated TestInfo namedtuple
500        if found, otherwise None.
501    """
502    tinfos = []
503    mod_info = self.module_info.get_module_info(module_name)
504    if self.module_info.is_testable_module(mod_info):
505      # path is a list with only 1 element.
506      rel_config = os.path.join(
507          mod_info[constants.MODULE_PATH][0], constants.MODULE_CONFIG
508      )
509      rel_configs = self._get_module_test_config(
510          module_name, rel_config=rel_config
511      )
512      for rel_config in rel_configs:
513        tinfo = self._process_test_info(
514            test_info.TestInfo(
515                test_name=module_name,
516                test_runner=self._TEST_RUNNER,
517                build_targets=set(),
518                data={
519                    constants.TI_REL_CONFIG: rel_config,
520                    constants.TI_FILTER: frozenset(),
521                },
522                compatibility_suites=mod_info.get(
523                    constants.MODULE_COMPATIBILITY_SUITES, []
524                ),
525            )
526        )
527        if tinfo:
528          tinfos.append(tinfo)
529      if tinfos:
530        return tinfos
531    return None
532
533  def find_test_by_kernel_class_name(self, module_name, class_name):
534    """Find kernel test for the given class name.
535
536    Args:
537        module_name: A string of the module name to use.
538        class_name: A string of the test's class name.
539
540    Returns:
541        A list of populated TestInfo namedtuple if test found, else None.
542    """
543
544    class_name, methods = test_filter_utils.split_methods(class_name)
545    test_configs = self._get_module_test_config(module_name)
546    if not test_configs:
547      return None
548    tinfos = []
549    for test_config in test_configs:
550      test_config_path = os.path.join(self.root_dir, test_config)
551      mod_info = self.module_info.get_module_info(module_name)
552      ti_filter = frozenset([test_info.TestFilter(class_name, methods)])
553      if test_finder_utils.is_test_from_kernel_xml(
554          test_config_path, class_name
555      ):
556        tinfo = self._process_test_info(
557            test_info.TestInfo(
558                test_name=module_name,
559                test_runner=self._TEST_RUNNER,
560                build_targets=set(),
561                data={
562                    constants.TI_REL_CONFIG: test_config,
563                    constants.TI_FILTER: ti_filter,
564                },
565                compatibility_suites=mod_info.get(
566                    constants.MODULE_COMPATIBILITY_SUITES, []
567                ),
568            )
569        )
570        if tinfo:
571          tinfos.append(tinfo)
572    if tinfos:
573      return tinfos
574    return None
575
576  def find_test_by_class_name(
577      self,
578      class_name: str,
579      module_name: str = None,
580      rel_config: str = None,
581      is_native_test: bool = False,
582  ) -> list[test_info.TestInfo]:
583    """Find test files given a class name.
584
585    If module_name and rel_config not given it will calculate it determine
586    it by looking up the tree from the class file.
587
588    Args:
589        class_name: A string of the test's class name.
590        module_name: Optional. A string of the module name to use.
591        rel_config: Optional. A string of module dir no-absolute to repo root.
592        is_native_test: A boolean variable of whether to search for a native
593          test or not.
594
595    Returns:
596        A list of populated TestInfo namedtuple if test found, else None.
597    """
598    class_name, methods = test_filter_utils.split_methods(class_name)
599    search_class_name = class_name
600    # For parameterized gtest, test class will be automerged to
601    # $(class_prefix)/$(base_class) name. Using $(base_class) for searching
602    # matched TEST_P to make sure test class is matched.
603    if '/' in search_class_name:
604      search_class_name = str(search_class_name).split('/')[-1]
605    if rel_config:
606      search_dir = os.path.join(self.root_dir, os.path.dirname(rel_config))
607    else:
608      search_dir = self.root_dir
609    test_paths = test_finder_utils.find_class_file(
610        search_dir, search_class_name, is_native_test, methods
611    )
612    if not test_paths and rel_config:
613      atest_utils.print_and_log_info(
614          'Did not find class (%s) under module path (%s), '
615          'researching from repo root.',
616          class_name,
617          rel_config,
618      )
619      test_paths = test_finder_utils.find_class_file(
620          self.root_dir, search_class_name, is_native_test, methods
621      )
622    test_paths = test_paths if test_paths is not None else []
623    # If we already have module name, use path in module-info as test_path.
624    if not test_paths:
625      if not module_name:
626        return None
627      # Use the module path as test_path.
628      module_paths = self.module_info.get_paths(module_name)
629      test_paths = []
630      for rel_module_path in module_paths:
631        test_paths.append(os.path.join(self.root_dir, rel_module_path))
632    tinfos = []
633    for test_path in test_paths:
634      test_filter = self._get_test_info_filter(
635          test_path,
636          methods,
637          class_name=class_name,
638          is_native_test=is_native_test,
639      )
640      test_infos = self._get_test_infos(
641          test_path, rel_config, module_name, test_filter
642      )
643      # If input include methods, check if tinfo match.
644      if test_infos and len(test_infos) > 1 and methods:
645        test_infos = self._get_matched_test_infos(test_infos, methods)
646      if test_infos:
647        tinfos.extend(test_infos)
648    return tinfos if tinfos else None
649
650  def _get_matched_test_infos(self, test_infos, methods):
651    """Get the test_infos matched the given methods.
652
653    Args:
654        test_infos: A list of TestInfo obj.
655        methods: A set of method name strings.
656
657    Returns:
658        A list of matched TestInfo namedtuple, else None.
659    """
660    matched_test_infos = set()
661    for tinfo in test_infos:
662      test_config, test_srcs = test_finder_utils.get_test_config_and_srcs(
663          tinfo, self.module_info
664      )
665      if test_config:
666        filter_dict = atest_utils.get_android_junit_config_filters(test_config)
667        # Always treat the test_info is matched if no filters found.
668        if not filter_dict.keys():
669          matched_test_infos.add(tinfo)
670          continue
671        for method in methods:
672          if self._is_srcs_match_method_annotation(
673              method, test_srcs, filter_dict
674          ):
675            logging.debug(
676                'For method:%s Test:%s matched filter_dict: %s',
677                method,
678                tinfo.test_name,
679                filter_dict,
680            )
681            matched_test_infos.add(tinfo)
682    return list(matched_test_infos)
683
684  def _is_srcs_match_method_annotation(self, method, srcs, annotation_dict):
685    """Check if input srcs matched annotation.
686
687    Args:
688        method: A string of test method name.
689        srcs: A list of source file of test.
690        annotation_dict: A dictionary record the include and exclude
691          annotations.
692
693    Returns:
694        True if input method matched the annotation of input srcs, else
695        None.
696    """
697    include_annotations = annotation_dict.get(constants.INCLUDE_ANNOTATION, [])
698    exclude_annotations = annotation_dict.get(constants.EXCLUDE_ANNOTATION, [])
699    for src in srcs:
700      include_methods = set()
701      src_path = os.path.join(self.root_dir, src)
702      # Add methods matched include_annotations.
703      for annotation in include_annotations:
704        include_methods.update(
705            test_finder_utils.get_annotated_methods(annotation, src_path)
706        )
707      if exclude_annotations:
708        # For exclude annotation, get all the method in the input srcs,
709        # and filter out the matched annotation.
710        exclude_methods = set()
711        all_methods = test_finder_utils.get_java_methods(src_path)
712        for annotation in exclude_annotations:
713          exclude_methods.update(
714              test_finder_utils.get_annotated_methods(annotation, src_path)
715          )
716        include_methods = all_methods - exclude_methods
717      if method in include_methods:
718        return True
719    return False
720
721  def find_test_by_module_and_class(
722      self, module_class: str
723  ) -> list[test_info.TestInfo]:
724    """Find the test info given a MODULE:CLASS string.
725
726    Args:
727        module_class: A string of form MODULE:CLASS or MODULE:CLASS#METHOD.
728
729    Returns:
730        A list of populated TestInfo if found, else None.
731    """
732    parse_result = test_finder_utils.parse_test_reference(module_class)
733    if not parse_result:
734      return None
735    module_name = parse_result['module_name']
736    class_name = parse_result['pkg_class_name']
737    method_name = parse_result.get('method_name', '')
738    if method_name:
739      class_name = class_name + '#' + method_name
740
741    # module_infos is a list of TestInfo with at most 1 element.
742    module_infos = self.find_test_by_module_name(module_name)
743    module_info = module_infos[0] if module_infos else None
744    if not module_info:
745      return None
746    find_result = None
747    # If the target module is JAVA or Python test, search class name.
748    find_result = self.find_test_by_class_name(
749        class_name,
750        module_name,
751        module_info.data.get(constants.TI_REL_CONFIG),
752        self.module_info.is_native_test(module_name),
753    )
754    # kernel target test is also define as NATIVE_TEST in build system.
755    # TODO: b/157210083 - Update find_test_by_kernel_class_name method to
756    # support gen_rule use case.
757    if not find_result:
758      find_result = self.find_test_by_kernel_class_name(module_name, class_name)
759    # Find by cc class.
760    if not find_result:
761      find_result = self.find_test_by_cc_class_name(
762          class_name,
763          module_info.test_name,
764          module_info.data.get(constants.TI_REL_CONFIG),
765      )
766    return find_result
767
768  def find_test_by_package_name(
769      self, package, module_name=None, rel_config=None
770  ):
771    """Find the test info given a PACKAGE string.
772
773    Args:
774        package: A string of the package name.
775        module_name: Optional. A string of the module name.
776        ref_config: Optional. A string of rel path of config.
777
778    Returns:
779        A list of populated TestInfo namedtuple if found, else None.
780    """
781    _, methods = test_filter_utils.split_methods(package)
782    if methods:
783      raise atest_error.MethodWithoutClassError(
784          '%s: Method filtering requires class' % (methods)
785      )
786    # Confirm that packages exists and get user input for multiples.
787    if rel_config:
788      search_dir = os.path.join(self.root_dir, os.path.dirname(rel_config))
789    else:
790      search_dir = self.root_dir
791    package_paths = test_finder_utils.run_find_cmd(
792        test_finder_utils.TestReferenceType.PACKAGE, search_dir, package
793    )
794    package_paths = package_paths if package_paths is not None else []
795    # Package path will be the full path to the dir represented by package.
796    if not package_paths:
797      if not module_name:
798        return None
799      module_paths = self.module_info.get_paths(module_name)
800      for rel_module_path in module_paths:
801        package_paths.append(os.path.join(self.root_dir, rel_module_path))
802    test_filter = frozenset([test_info.TestFilter(package, frozenset())])
803    test_infos = []
804    for package_path in package_paths:
805      tinfo = self._get_test_infos(
806          package_path, rel_config, module_name, test_filter
807      )
808      if tinfo:
809        test_infos.extend(tinfo)
810    return test_infos if test_infos else None
811
812  def find_test_by_module_and_package(self, module_package):
813    """Find the test info given a MODULE:PACKAGE string.
814
815    Args:
816        module_package: A string of form MODULE:PACKAGE
817
818    Returns:
819        A list of populated TestInfo namedtuple if found, else None.
820    """
821    parse_result = test_finder_utils.parse_test_reference(module_package)
822    if not parse_result:
823      return None
824    module_name = parse_result['module_name']
825    package = parse_result['pkg_class_name']
826    method = parse_result.get('method_name', '')
827    if method:
828      package = package + '#' + method
829
830    # module_infos is a list with at most 1 element.
831    module_infos = self.find_test_by_module_name(module_name)
832    module_info = module_infos[0] if module_infos else None
833    if not module_info:
834      return None
835    return self.find_test_by_package_name(
836        package,
837        module_info.test_name,
838        module_info.data.get(constants.TI_REL_CONFIG),
839    )
840
841  def find_test_by_path(self, rel_path: str) -> List[test_info.TestInfo]:
842    """Find the first test info matching the given path.
843
844    Strategy:
845        path_to_java_file --> Resolve to CLASS
846        path_to_cc_file --> Resolve to CC CLASS
847        path_to_module_file -> Resolve to MODULE
848        path_to_module_dir -> Resolve to MODULE
849        path_to_dir_with_class_files--> Resolve to PACKAGE
850        path_to_any_other_dir --> Resolve as MODULE
851
852    Args:
853        rel_path: A string of the relative path to $BUILD_TOP.
854
855    Returns:
856        A list of populated TestInfo namedtuple if test found, else None
857    """
858    logging.debug('Finding test by path: %s', rel_path)
859    path, methods = test_filter_utils.split_methods(rel_path)
860    # TODO: See if this can be generalized and shared with methods above
861    # create absolute path from cwd and remove symbolic links
862    path = os.path.realpath(path)
863    if not os.path.exists(path):
864      return None
865    if methods and not test_finder_utils.has_method_in_file(path, methods):
866      return None
867    dir_path, _ = test_finder_utils.get_dir_path_and_filename(path)
868    # Module/Class
869    rel_module_dir = test_finder_utils.find_parent_module_dir(
870        self.root_dir, dir_path, self.module_info
871    )
872
873    # If the input file path does not belong to a module(by searching
874    # upwards to the build_top), check whether it belongs to the dependency
875    # of modules.
876    if not rel_module_dir:
877      testable_modules = self.module_info.get_modules_by_include_deps(
878          self.module_info.get_modules_by_path_in_srcs(rel_path),
879          testable_module_only=True,
880      )
881      if testable_modules:
882        test_filter = self._get_test_info_filter(
883            path, methods, rel_module_dir=rel_module_dir
884        )
885        tinfos = []
886        for testable_module in testable_modules:
887          rel_config = os.path.join(
888              self.module_info.get_paths(testable_module)[0],
889              constants.MODULE_CONFIG,
890          )
891          tinfos.extend(
892              self._get_test_infos(
893                  path, rel_config, testable_module, test_filter
894              )
895          )
896        metrics.LocalDetectEvent(
897            detect_type=DetectType.FIND_TEST_IN_DEPS, result=1
898        )
899        return tinfos
900
901    if not rel_module_dir:
902      # Try to find unit-test for input path.
903      path = os.path.relpath(
904          os.path.realpath(rel_path),
905          os.environ.get(constants.ANDROID_BUILD_TOP, ''),
906      )
907      unit_tests = test_finder_utils.find_host_unit_tests(
908          self.module_info, path
909      )
910      if unit_tests:
911        tinfos = []
912        for unit_test in unit_tests:
913          tinfo = self._get_test_infos(
914              path, constants.MODULE_CONFIG, unit_test, frozenset()
915          )
916          if tinfo:
917            tinfos.extend(tinfo)
918        return tinfos
919      return None
920    rel_config = os.path.join(rel_module_dir, constants.MODULE_CONFIG)
921    test_filter = self._get_test_info_filter(
922        path, methods, rel_module_dir=rel_module_dir
923    )
924    return self._get_test_infos(path, rel_config, None, test_filter)
925
926  def find_test_by_cc_class_name(
927      self, class_name, module_name=None, rel_config=None
928  ):
929    """Find test files given a cc class name.
930
931    If module_name and rel_config not given, test will be determined
932    by looking up the tree for files which has input class.
933
934    Args:
935        class_name: A string of the test's class name.
936        module_name: Optional. A string of the module name to use.
937        rel_config: Optional. A string of module dir no-absolute to repo root.
938
939    Returns:
940        A list of populated TestInfo namedtuple if test found, else None.
941    """
942    # Check if class_name is prepended with file name. If so, trim the
943    # prefix and keep only the class_name.
944    if '.' in class_name:
945      # (b/202764540) Strip prefixes of a cc class.
946      # Assume the class name has a format of file_name.class_name
947      class_name = class_name[class_name.rindex('.') + 1 :]
948      atest_utils.print_and_log_info(
949          'Search with updated class name: %s', class_name
950      )
951    return self.find_test_by_class_name(
952        class_name, module_name, rel_config, is_native_test=True
953    )
954
955  def get_testable_modules_with_ld(self, user_input, ld_range=0):
956    """Calculate the edit distances of the input and testable modules.
957
958    The user input will be calculated across all testable modules and
959    results in integers generated by Levenshtein Distance algorithm.
960    To increase the speed of the calculation, a bound can be applied to
961    this method to prevent from calculating every testable modules.
962
963    Guessing from typos, e.g. atest atest_unitests, implies a tangible range
964    of length that Atest only needs to search within it, and the default of
965    the bound is 2.
966
967    Guessing from keywords however, e.g. atest --search Camera, means that
968    the uncertainty of the module name is way higher, and Atest should walk
969    through all testable modules and return the highest possibilities.
970
971    Args:
972        user_input: A string of the user input.
973        ld_range: An integer that range the searching scope. If the length of
974          user_input is 10, then Atest will calculate modules of which length is
975          between 8 and 12. 0 is equivalent to unlimited.
976
977    Returns:
978        A List of LDs and possible module names. If the user_input is "fax",
979        the output will be like:
980        [[2, "fog"], [2, "Fix"], [4, "duck"], [7, "Duckies"]]
981
982        Which means the most lilely names of "fax" are fog and Fix(LD=2),
983        while Dickies is the most unlikely one(LD=7).
984    """
985    atest_utils.colorful_print(
986        '\nSearching for similar module names using fuzzy search...',
987        constants.CYAN,
988    )
989    search_start = time.time()
990    testable_modules = sorted(self.module_info.get_testable_modules(), key=len)
991    lower_bound = len(user_input) - ld_range
992    upper_bound = len(user_input) + ld_range
993    testable_modules_with_ld = []
994    for module_name in testable_modules:
995      # Dispose those too short or too lengthy.
996      if ld_range != 0:
997        if len(module_name) < lower_bound:
998          continue
999        if len(module_name) > upper_bound:
1000          break
1001      testable_modules_with_ld.append([
1002          test_finder_utils.get_levenshtein_distance(user_input, module_name),
1003          module_name,
1004      ])
1005    search_duration = time.time() - search_start
1006    logging.debug('Fuzzy search took %ss', search_duration)
1007    metrics.LocalDetectEvent(
1008        detect_type=DetectType.FUZZY_SEARCH_TIME, result=round(search_duration)
1009    )
1010    return testable_modules_with_ld
1011
1012  def get_fuzzy_searching_results(self, user_input):
1013    """Give results which have no more than allowance of edit distances.
1014
1015    Args:
1016        user_input: the target module name for fuzzy searching.
1017
1018    Return:
1019        A list of guessed modules.
1020    """
1021    modules_with_ld = self.get_testable_modules_with_ld(
1022        user_input, ld_range=constants.LD_RANGE
1023    )
1024    guessed_modules = []
1025    for _distance, _module in modules_with_ld:
1026      if _distance <= abs(constants.LD_RANGE):
1027        guessed_modules.append(_module)
1028    return guessed_modules
1029
1030  def find_test_by_config_name(self, config_name):
1031    """Find test for the given config name.
1032
1033    Args:
1034        config_name: A string of the test's config name.
1035
1036    Returns:
1037        A list that includes only 1 populated TestInfo namedtuple
1038        if found, otherwise None.
1039    """
1040    for module_name, mod_info in self.module_info.name_to_module_info.items():
1041      test_configs = mod_info.get(constants.MODULE_TEST_CONFIG, [])
1042      for test_config in test_configs:
1043        test_config_name = os.path.splitext(os.path.basename(test_config))[0]
1044        if test_config_name == config_name:
1045          tinfo = test_info.TestInfo(
1046              test_name=test_config_name,
1047              test_runner=self._TEST_RUNNER,
1048              build_targets=self._get_build_targets(module_name, test_config),
1049              data={
1050                  constants.TI_REL_CONFIG: test_config,
1051                  constants.TI_FILTER: frozenset(),
1052              },
1053              compatibility_suites=mod_info.get(
1054                  constants.MODULE_COMPATIBILITY_SUITES, []
1055              ),
1056          )
1057          test_config_path = os.path.join(self.root_dir, test_config)
1058          if test_finder_utils.need_aggregate_metrics_result(test_config_path):
1059            tinfo.aggregate_metrics_result = True
1060          if tinfo:
1061            # There should have only one test_config with the same
1062            # name in source tree.
1063            return [tinfo]
1064    return None
1065
1066  @staticmethod
1067  def _is_comparted_src(path):
1068    """Check if the input path need to match srcs information in module.
1069
1070    If path is a folder or android build file, we don't need to compart
1071    with module's srcs.
1072
1073    Args:
1074        path: A string of the test's path.
1075
1076    Returns:
1077        True if input path need to match with module's src info, else False.
1078    """
1079    if os.path.isdir(path):
1080      return False
1081    if atest_utils.is_build_file(path):
1082      return False
1083    return True
1084
1085
1086class MainlineModuleFinder(ModuleFinder):
1087  """Mainline Module finder class."""
1088
1089  NAME = 'MAINLINE_MODULE'
1090
1091  def __init__(self, module_info=None):
1092    super().__init__()
1093