1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Module Info class used to hold cached module-info.json."""
16
17# pylint: disable=too-many-lines
18from __future__ import annotations
19
20import collections
21import json
22import logging
23import os
24from pathlib import Path
25import pickle
26import re
27import shutil
28import sqlite3
29import sys
30import tempfile
31import time
32from typing import Any, Callable, Dict, List, Set, Tuple
33
34from atest import atest_utils
35from atest import constants
36from atest.atest_enum import DetectType, ExitCode
37from atest.metrics import metrics
38
39
40# JSON file generated by build system that lists all buildable targets.
41_MODULE_INFO = 'module-info.json'
42# JSON file generated by build system that lists dependencies for java.
43_JAVA_DEP_INFO = 'module_bp_java_deps.json'
44# JSON file generated by build system that lists dependencies for cc.
45_CC_DEP_INFO = 'module_bp_cc_deps.json'
46# JSON file generated by atest merged the content from module-info,
47# module_bp_java_deps.json, and module_bp_cc_deps.
48_MERGED_INFO = 'atest_merged_dep.json'
49_DB_VERSION = 2
50_DB_NAME = f'module-info.{_DB_VERSION}.db'
51_NAME_MODULE_TABLE = 'modules'
52_PATH_MODULE_TABLE = 'path_modules'
53
54
55Module = Dict[str, Any]
56
57
58def load_from_file(
59    module_file: Path = None,
60    force_build: bool = False,
61) -> ModuleInfo:
62  """Factory method that initializes ModuleInfo from the build-generated
63
64  JSON file
65  """
66  loader = Loader(
67      module_file=module_file,
68      force_build=force_build,
69      need_merge_fn=lambda: False,
70  )
71
72  mi = loader.load()
73
74  return mi
75
76
77def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo:
78  """Factory method that initializes ModuleInfo from a dictionary."""
79  path_to_module_info = get_path_to_module_info(name_to_module_info)
80  return ModuleInfo(
81      name_to_module_info=name_to_module_info,
82      path_to_module_info=path_to_module_info,
83      get_testable_modules=lambda s: _get_testable_modules(
84          name_to_module_info, path_to_module_info, s
85      ),
86  )
87
88
89def create_empty() -> ModuleInfo:
90  """Factory method that initializes an empty ModuleInfo."""
91  return ModuleInfo()
92
93
94def load(
95    force_build: bool = False, sqlite_module_cache: bool = False
96) -> ModuleInfo:
97  """Factory method that initializes ModuleInfo from the build-generated
98
99  JSON or Sqlite file.
100  """
101  mod_start = time.time()
102  loader = Loader(
103      force_build=force_build, sqlite_module_cache=sqlite_module_cache
104  )
105  mod_stop = time.time() - mod_start
106  metrics.LocalDetectEvent(
107      detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000)
108  )
109
110  return loader.load(save_timestamps=True)
111
112
113def metrics_timer(func):
114  """Decorator method for sending data to metrics."""
115
116  def wrapper(*args, **kwargs):
117    start = time.time()
118    result = func(*args, **kwargs)
119    elapsed_time = int(time.time() - start)
120    metrics.LocalDetectEvent(
121        detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time
122    )
123    return result
124
125  return wrapper
126
127
128class Loader:
129  """Class that handles load and merge processes."""
130
131  def __init__(
132      self,
133      module_file: Path = None,
134      force_build: bool = False,
135      sqlite_module_cache: bool = False,
136      need_merge_fn: Callable = None,
137  ):
138    logging.debug(
139        'Creating module info loader object with module_file: %s, force_build:'
140        ' %s, sqlite_module_cache: %s, need_merge_fn: %s',
141        module_file,
142        force_build,
143        sqlite_module_cache,
144        need_merge_fn,
145    )
146    self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO)
147    self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO)
148    self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO)
149    logging.debug(
150        'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s',
151        self.java_dep_path,
152        self.cc_dep_path,
153        self.merged_dep_path,
154    )
155
156    self.sqlite_module_cache = sqlite_module_cache
157    logging.debug('sqlite_module_cache: %s', sqlite_module_cache)
158    if self.sqlite_module_cache:
159      self.cache_file = atest_utils.get_product_out(_DB_NAME)
160      self.save_cache_async = self._save_db_async
161      self.load_from_cache = self._load_from_db
162    else:
163      self.cache_file = self.merged_dep_path
164      self.save_cache_async = self._save_json_async
165      self.load_from_cache = self._load_from_json
166
167    if need_merge_fn:
168      self.save_cache_async = lambda _, __: None
169
170    self.update_merge_info = False
171    self.module_index = atest_utils.get_index_path(
172        f'suite-modules.{_DB_VERSION}.idx'
173    )
174    self.module_index_proc = None
175    logging.debug('module_index: %s', self.module_index)
176
177    if module_file:
178      self.mod_info_file_path = Path(module_file)
179      self.load_module_info = self._load_module_info_from_file_wo_merging
180    else:
181      self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO)
182      if force_build:
183        logging.debug('Triggering module info build by force build.')
184        build()
185      elif not self.mod_info_file_path.is_file():
186        logging.debug(
187            'Triggering module info build due to module info file path %s not'
188            ' exist.',
189            self.mod_info_file_path,
190        )
191        build()
192
193      self.update_merge_info = self.need_merge_module_info()
194      self.load_module_info = self._load_module_info_file
195
196    logging.debug(
197        'Executing load_module_info function %s', self.load_module_info
198    )
199    self.name_to_module_info, self.path_to_module_info = self.load_module_info()
200
201    logging.debug('Completed creating module info loader object')
202
203  def load(self, save_timestamps: bool = False):
204    logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps)
205    if save_timestamps:
206      atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp)
207
208    return ModuleInfo(
209        name_to_module_info=self.name_to_module_info,
210        path_to_module_info=self.path_to_module_info,
211        mod_info_file_path=self.mod_info_file_path,
212        get_testable_modules=self.get_testable_modules,
213    )
214
215  def _load_module_info_file(self):
216    """Load module-info.json file as ModuleInfo and merge related JSON files
217
218    whenever required.
219
220    Returns:
221        Dict of module name to module info and dict of module path to module
222        info.
223    """
224    # +--------------+                  +----------------------------------+
225    # | ModuleInfo() |                  | ModuleInfo(module_file=foo.json) |
226    # +-------+------+                  +----------------+-----------------+
227    #         | module_info.build()                      | load
228    #         v                                          V
229    # +--------------------------+         +--------------------------+
230    # | module-info.json         |         | foo.json                 |
231    # | module_bp_cc_deps.json   |         | module_bp_cc_deps.json   |
232    # | module_bp_java_deps.json |         | module_bp_java_deps.json |
233    # +--------------------------+         +--------------------------+
234    #         |                                          |
235    #         | _merge_soong_info() <--------------------+
236    #         v
237    # +============================+
238    # |  $ANDROID_PRODUCT_OUT      |
239    # |    /atest_merged_dep.json  |--> load as module info.
240    # +============================+
241    if not self.update_merge_info:
242      return self.load_from_cache()
243
244    name_modules, path_modules = self._load_from_json(merge=True)
245    self.save_cache_async(name_modules, path_modules)
246    self._save_testable_modules_async(name_modules, path_modules)
247
248    return name_modules, path_modules
249
250  def _load_module_info_from_file_wo_merging(self):
251    """Load module-info.json as ModuleInfo without merging."""
252    name_modules = atest_utils.load_json_safely(self.mod_info_file_path)
253    _add_missing_variant_modules(name_modules)
254
255    return name_modules, get_path_to_module_info(name_modules)
256
257  def _save_db_async(
258      self,
259      name_to_module_info: Dict[str, Any],
260      path_to_module_info: Dict[str, Any],
261  ):
262    """Save data to a Sqlite database in parallel."""
263    data_map = {
264        _NAME_MODULE_TABLE: name_to_module_info,
265        _PATH_MODULE_TABLE: path_to_module_info,
266    }
267    _save_data_async(
268        function=_create_db,
269        contents=data_map,
270        target_path=self.cache_file,
271    )
272
273  def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]:
274    """Return a tuple of dicts by from SqliteDict."""
275    conn = sqlite3.connect(self.cache_file)
276    with conn:
277      name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE)
278      path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE)
279
280      return name_to_module_info, path_to_module_info
281
282  def _save_json_async(self, name_to_module_info: Dict[str, Any], _):
283    """Save data to a JSON format in parallel."""
284    _save_data_async(
285        function=_create_json,
286        contents=name_to_module_info,
287        target_path=self.cache_file,
288    )
289
290  def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]:
291    """Load or merge module info from json file.
292
293    Args:
294        merge: Boolean whether to merge build system infos.
295
296    Returns:
297        A tuple of (name_to_module_info, path_to_module_info).
298    """
299    start = time.time()
300    if merge:
301      name_info = self._merge_build_system_infos(
302          atest_utils.load_json_safely(self.mod_info_file_path)
303      )
304      duration = time.time() - start
305      logging.debug('Merging module info took %ss', duration)
306      metrics.LocalDetectEvent(
307          detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000)
308      )
309
310      return name_info, get_path_to_module_info(name_info)
311
312    name_info = atest_utils.load_json_safely(self.merged_dep_path)
313    duration = time.time() - start
314    logging.debug('Loading module info took %ss', duration)
315    metrics.LocalDetectEvent(
316        detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000)
317    )
318    logging.debug('Loading %s as module-info.', self.merged_dep_path)
319
320    return name_info, get_path_to_module_info(name_info)
321
322  def _save_testable_modules_async(
323      self,
324      name_to_module_info: Dict[str, Any],
325      path_to_module_info: Dict[str, Any],
326  ):
327    """Save testable modules in parallel."""
328    return atest_utils.run_multi_proc(
329        func=_get_testable_modules,
330        kwargs={
331            'name_to_module_info': name_to_module_info,
332            'path_to_module_info': path_to_module_info,
333            'index_path': self.module_index,
334        },
335    )
336
337  def need_merge_module_info(self):
338    """Check if needed to regenerate the cache file.
339
340    If the cache file is non-existent or testable module index is inexistent
341    or older than any of the JSON files used to generate it, the cache file
342    must re-generate.
343
344    Returns:
345        True when the cache file is older or non-existent, False otherwise.
346    """
347    if not self.cache_file.is_file():
348      return True
349
350    if not self.module_index.is_file():
351      return True
352
353    # The dependency input files should be generated at this point.
354    return any(
355        self.cache_file.stat().st_mtime < f.stat().st_mtime
356        for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path)
357    )
358
359  def _merge_build_system_infos(
360      self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None
361  ):
362    """Merge the content of module-info.json and CC/Java dependency files
363
364    to name_to_module_info.
365
366    Args:
367        name_to_module_info: Dict of module name to module info dict.
368        java_bp_info_path: String of path to java dep file to load up. Used for
369          testing.
370        cc_bp_info_path: String of path to cc dep file to load up. Used for
371          testing.
372
373    Returns:
374        Dict of updated name_to_module_info.
375    """
376    # Merge _JAVA_DEP_INFO
377    if not java_bp_info_path:
378      java_bp_info_path = self.java_dep_path
379    java_bp_infos = atest_utils.load_json_safely(java_bp_info_path)
380    if java_bp_infos:
381      logging.debug('Merging Java build info: %s', java_bp_info_path)
382      name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos)
383    # Merge _CC_DEP_INFO
384    if not cc_bp_info_path:
385      cc_bp_info_path = self.cc_dep_path
386    cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path)
387    if cc_bp_infos:
388      logging.debug('Merging CC build info: %s', cc_bp_info_path)
389      # CC's dep json format is different with java.
390      # Below is the example content:
391      # {
392      #   "clang": "${ANDROID_ROOT}/bin/clang",
393      #   "clang++": "${ANDROID_ROOT}/bin/clang++",
394      #   "modules": {
395      #       "ACameraNdkVendorTest": {
396      #           "path": [
397      #                   "frameworks/av/camera/ndk"
398      #           ],
399      #           "srcs": [
400      #                   "frameworks/tests/AImageVendorTest.cpp",
401      #                   "frameworks/tests/ACameraManagerTest.cpp"
402      #           ],
403      name_to_module_info = merge_soong_info(
404          name_to_module_info, cc_bp_infos.get('modules', {})
405      )
406    # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it
407    # without dumping atest_merged_dep.json in real.
408
409    # Adds the key into module info as a unique ID.
410    for key, info in name_to_module_info.items():
411      info[constants.MODULE_INFO_ID] = key
412
413    _add_missing_variant_modules(name_to_module_info)
414
415    return name_to_module_info
416
417  @metrics_timer
418  def get_testable_modules(self, suite=None):
419    """Return the testable modules of the given suite name.
420
421    Atest does not index testable modules against compatibility_suites. When
422    suite was given, or the index file was interrupted, always run
423    _get_testable_modules() and re-index.
424
425    Args:
426        suite: A string of suite name.
427
428    Returns:
429        If suite is not given, return all the testable modules in module
430        info, otherwise return only modules that belong to the suite.
431    """
432    modules = set()
433
434    if self.module_index.is_file():
435      modules = self.get_testable_modules_from_index(suite)
436    # If the modules.idx does not exist or invalid for any reason, generate
437    # a new one arbitrarily.
438    if not modules:
439      modules = self.get_testable_module_from_memory(suite)
440
441    return modules
442
443  def get_testable_modules_from_index(self, suite: str = None) -> Set[str]:
444    """Return the testable modules of the given suite name."""
445    suite_to_modules = {}
446    with open(self.module_index, 'rb') as cache:
447      try:
448        suite_to_modules = pickle.load(cache, encoding='utf-8')
449      except UnicodeDecodeError:
450        suite_to_modules = pickle.load(cache)
451      # when module indexing was interrupted.
452      except EOFError:
453        pass
454
455    return _filter_modules_by_suite(suite_to_modules, suite)
456
457  def get_testable_module_from_memory(self, suite: str = None) -> Set[str]:
458    """Return the testable modules of the given suite name."""
459    return _get_testable_modules(
460        name_to_module_info=self.name_to_module_info,
461        path_to_module_info=self.path_to_module_info,
462        index_path=self.module_index,
463        suite=suite,
464    )
465
466
467class ModuleInfo:
468  """Class that offers fast/easy lookup for Module related details."""
469
470  def __init__(
471      self,
472      name_to_module_info: Dict[str, Any] = None,
473      path_to_module_info: Dict[str, Any] = None,
474      mod_info_file_path: Path = None,
475      get_testable_modules: Callable = None,
476  ):
477    """Initialize the ModuleInfo object.
478
479    Load up the module-info.json file and initialize the helper vars.
480    Note that module-info.json does not contain all module dependencies,
481    therefore, Atest needs to accumulate dependencies defined in bp files.
482
483    Args:
484        name_to_module_info: Dict of name to module info.
485        path_to_module_info: Dict of path to module info.
486        mod_info_file_path: Path of module-info.json.
487        get_testable_modules: Function to get all testable modules.
488    """
489    #   +----------------------+     +----------------------------+
490    #   | $ANDROID_PRODUCT_OUT |     |$ANDROID_BUILD_TOP/out/soong|
491    #   |  /module-info.json   |     |  /module_bp_java_deps.json |
492    #   +-----------+----------+     +-------------+--------------+
493    #               |     _merge_soong_info()      |
494    #               +------------------------------+
495    #               |
496    #               v
497    # +----------------------------+  +----------------------------+
498    # |tempfile.NamedTemporaryFile |  |$ANDROID_BUILD_TOP/out/soong|
499    # +-------------+--------------+  |  /module_bp_cc_deps.json   |
500    #               |                 +-------------+--------------+
501    #               |     _merge_soong_info()       |
502    #               +-------------------------------+
503    #                              |
504    #                      +-------|
505    #                      v
506    #         +============================+
507    #         |  $ANDROID_PRODUCT_OUT      |
508    #         |    /atest_merged_dep.json  |--> load as module info.
509    #         +============================+
510    self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
511
512    self.name_to_module_info = name_to_module_info or {}
513    self.path_to_module_info = path_to_module_info or {}
514    self.mod_info_file_path = mod_info_file_path
515    self._get_testable_modules = get_testable_modules
516
517  def is_module(self, name):
518    """Return True if name is a module, False otherwise."""
519    info = self.get_module_info(name)
520    # From aosp/2293302 it started merging all modules' dependency in bp
521    # even the module is not be exposed to make, and those modules could not
522    # be treated as a build target using m. Only treat input name as module
523    # if it also has the module_name attribute which means it could be a
524    # build target for m.
525    if info and info.get(constants.MODULE_NAME):
526      return True
527    return False
528
529  def get_paths(self, name) -> list[str]:
530    """Return paths of supplied module name, Empty list if non-existent."""
531    info = self.get_module_info(name)
532    if info:
533      return info.get(constants.MODULE_PATH, [])
534    return []
535
536  def get_module_names(self, rel_module_path):
537    """Get the modules that all have module_path.
538
539    Args:
540        rel_module_path: path of module in module-info.json
541
542    Returns:
543        List of module names.
544    """
545    return _get_module_names(self.path_to_module_info, rel_module_path)
546
547  def get_module_info(self, mod_name):
548    """Return dict of info for given module name, None if non-existence."""
549    return self.name_to_module_info.get(mod_name)
550
551  @staticmethod
552  def is_suite_in_compatibility_suites(suite, mod_info):
553    """Check if suite exists in the compatibility_suites of module-info.
554
555    Args:
556        suite: A string of suite name.
557        mod_info: Dict of module info to check.
558
559    Returns:
560        True if it exists in mod_info, False otherwise.
561    """
562    if not isinstance(mod_info, dict):
563      return False
564    return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, [])
565
566  def get_testable_modules(self, suite=None):
567    return self._get_testable_modules(suite)
568
569  @staticmethod
570  def is_tradefed_testable_module(info: Dict[str, Any]) -> bool:
571    """Check whether the module is a Tradefed executable test."""
572    if not info:
573      return False
574    if not info.get(constants.MODULE_INSTALLED, []):
575      return False
576    return ModuleInfo.has_test_config(info)
577
578  @staticmethod
579  def is_mobly_module(info: Dict[str, Any]) -> bool:
580    """Check whether the module is a Mobly test.
581
582    Note: Only python_test_host modules marked with a test_options tag of
583      "mobly" is considered a Mobly module.
584
585    Args:
586        info: Dict of module info to check.
587
588    Returns:
589        True if this is a Mobly test module, False otherwise.
590    """
591    return constants.MOBLY_TEST_OPTIONS_TAG in info.get(
592        constants.MODULE_TEST_OPTIONS_TAGS, []
593    )
594
595  def is_testable_module(self, info: Dict[str, Any]) -> bool:
596    """Check if module is something we can test.
597
598    A module is testable if:
599      - it's a tradefed testable module, or
600      - it's a Mobly module, or
601      - it's a robolectric module (or shares path with one).
602
603    Args:
604        info: Dict of module info to check.
605
606    Returns:
607        True if we can test this module, False otherwise.
608    """
609    return _is_testable_module(
610        self.name_to_module_info, self.path_to_module_info, info
611    )
612
613  @staticmethod
614  def has_test_config(info: Dict[str, Any]) -> bool:
615    """Validate if this module has a test config.
616
617    A module can have a test config in the following manner:
618      - test_config be set in module-info.json.
619      - Auto-generated config via the auto_test_config key
620        in module-info.json.
621
622    Args:
623        info: Dict of module info to check.
624
625    Returns:
626        True if this module has a test config, False otherwise.
627    """
628    return bool(
629        info.get(constants.MODULE_TEST_CONFIG, [])
630        or info.get('auto_test_config', [])
631    )
632
633  def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool:
634    """Return whether the module_name is a legacy Robolectric test"""
635    return _is_legacy_robolectric_test(
636        self.name_to_module_info, self.path_to_module_info, info
637    )
638
639  def get_robolectric_test_name(self, info: Dict[str, Any]) -> str:
640    """Returns runnable robolectric module name.
641
642    This method is for legacy robolectric tests and returns one of associated
643    modules. The pattern is determined by the amount of shards:
644
645    10 shards:
646        FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9
647    No shard:
648        FooTests -> RunFooTests
649
650    Arg:
651        info: Dict of module info to check.
652
653    Returns:
654        String of the first-matched associated module that belongs to the
655        actual robolectric module, None if nothing has been found.
656    """
657    return _get_robolectric_test_name(
658        self.name_to_module_info, self.path_to_module_info, info
659    )
660
661  def is_robolectric_test(self, module_name):
662    """Check if the given module is a robolectric test.
663
664    Args:
665        module_name: String of module to check.
666
667    Returns:
668        Boolean whether it's a robotest or not.
669    """
670    if self.get_robolectric_type(module_name):
671      return True
672    return False
673
674  def get_robolectric_type(self, module_name: str) -> int:
675    """Check if the given module is a robolectric test and return type of it.
676
677    Robolectric declaration is converting from Android.mk to Android.bp, and
678    in the interim Atest needs to support testing both types of tests.
679
680    The modern robolectric tests defined by 'android_robolectric_test' in an
681    Android.bp file can can be run in Tradefed Test Runner:
682
683        SettingsRoboTests -> Tradefed Test Runner
684
685    Legacy tests defined in an Android.mk can only run with the 'make' way.
686
687        SettingsRoboTests -> make RunSettingsRoboTests0
688
689    To determine whether the test is a modern/legacy robolectric test:
690        1. If the 'robolectric-test` in the compatibility_suites, it's a
691           modern one, otherwise it's a legacy test. This is accurate since
692           aosp/2308586 already set the test suite of `robolectric-test`
693           for all `modern` Robolectric tests in Soong.
694        2. Traverse all modules share the module path. If one of the
695           modules has a ROBOLECTRIC class, it's a legacy robolectric test.
696
697    Args:
698        module_name: String of module to check.
699
700    Returns:
701        0: not a robolectric test.
702        1: a modern robolectric test(defined in Android.bp)
703        2: a legacy robolectric test(defined in Android.mk)
704    """
705    info = self.get_module_info(module_name)
706    if not info:
707      return 0
708    # Some Modern mode Robolectric test has related module which compliant
709    # with the Legacy Robolectric test. In this case, the Modern mode
710    # Robolectric tests should be prior to the Legacy mode.
711    if self.is_modern_robolectric_test(info):
712      return constants.ROBOTYPE_MODERN
713    if self.is_legacy_robolectric_test(info):
714      return constants.ROBOTYPE_LEGACY
715    return 0
716
717  def get_instrumentation_target_apps(self, module_name: str) -> Dict:
718    """Return target APKs of an instrumentation test.
719
720    Returns:
721        A dict of target module and target APK(s). e.g.
722        {"FooService": {"/path/to/the/FooService.apk"}}
723    """
724    # 1. Determine the actual manifest filename from an Android.bp(if any)
725    manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml')
726    bpfile = self.get_filepath_from_module(module_name, 'Android.bp')
727    if bpfile.is_file():
728      bp_info = atest_utils.get_bp_content(bpfile, 'android_test')
729      if not bp_info or not bp_info.get(module_name):
730        return {}
731      manifest = self.get_filepath_from_module(
732          module_name, bp_info.get(module_name).get('manifest')
733      )
734    xml_info = atest_utils.get_manifest_info(manifest)
735    # 2. Translate package name to a module name.
736    package = xml_info.get('package')
737    target_package = xml_info.get('target_package')
738    # Ensure it's an instrumentation test(excluding self-instrmented)
739    if target_package and package != target_package:
740      logging.debug('Found %s an instrumentation test.', module_name)
741      metrics.LocalDetectEvent(
742          detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1
743      )
744      target_module = self.get_target_module_by_pkg(
745          package=target_package, search_from=manifest.parent
746      )
747      if target_module:
748        return self.get_artifact_map(target_module)
749    return {}
750
751  # pylint: disable=anomalous-backslash-in-string
752  def get_target_module_by_pkg(self, package: str, search_from: Path) -> str:
753    """Translate package name to the target module name.
754
755    This method is dedicated to determine the target module by translating
756    a package name.
757
758    Phase 1: Find out possible manifest files among parent directories.
759    Phase 2. Look for the defined package fits the given name, and ensure
760             it is not a persistent app.
761    Phase 3: Translate the manifest path to possible modules. A valid module
762             must fulfill:
763             1. The 'class' type must be ['APPS'].
764             2. It is not a Robolectric test.
765
766    Returns:
767        A string of module name.
768    """
769    xmls = []
770    for pth in search_from.parents:
771      if pth == Path(self.root_dir):
772        break
773      for name in os.listdir(pth):
774        if pth.joinpath(name).is_file():
775          match = re.match('.*AndroidManifest.*\.xml$', name)
776          if match:
777            xmls.append(os.path.join(pth, name))
778    possible_modules = []
779    for xml in xmls:
780      rel_dir = str(Path(xml).relative_to(self.root_dir).parent)
781      logging.debug('Looking for package "%s" in %s...', package, xml)
782      xml_info = atest_utils.get_manifest_info(xml)
783      if xml_info.get('package') == package:
784        if xml_info.get('persistent'):
785          logging.debug('%s is a persistent app.', package)
786          continue
787        for _m in self.path_to_module_info.get(rel_dir):
788          possible_modules.append(_m)
789    if possible_modules:
790      for mod in possible_modules:
791        name = mod.get('module_name')
792        if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name):
793          return name
794    return ''
795
796  def get_artifact_map(self, module_name: str) -> Dict:
797    """Get the installed APK path of the given module."""
798    target_mod_info = self.get_module_info(module_name)
799    artifact_map = {}
800    if target_mod_info:
801      apks = set()
802      artifacts = target_mod_info.get('installed')
803      for artifact in artifacts:
804        if Path(artifact).suffix == '.apk':
805          apks.add(os.path.join(self.root_dir, artifact))
806      artifact_map.update({module_name: apks})
807    return artifact_map
808
809  def is_auto_gen_test_config(self, module_name):
810    """Check if the test config file will be generated automatically.
811
812    Args:
813        module_name: A string of the module name.
814
815    Returns:
816        True if the test config file will be generated automatically.
817    """
818    if self.is_module(module_name):
819      mod_info = self.get_module_info(module_name)
820      auto_test_config = mod_info.get('auto_test_config', [])
821      return auto_test_config and auto_test_config[0]
822    return False
823
824  @staticmethod
825  def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool:
826    """Check if the class is `ROBOLECTRIC`
827
828    This method is for legacy robolectric tests that the associated modules
829    contain:
830        'class': ['ROBOLECTRIC']
831
832    Args:
833        info: ModuleInfo to check.
834
835    Returns:
836        True if the attribute class in mod_info is ROBOLECTRIC, False
837        otherwise.
838    """
839    if info:
840      module_classes = info.get(constants.MODULE_CLASS, [])
841      return (
842          module_classes
843          and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC
844      )
845    return False
846
847  def is_native_test(self, module_name):
848    """Check if the input module is a native test.
849
850    Args:
851        module_name: A string of the module name.
852
853    Returns:
854        True if the test is a native test, False otherwise.
855    """
856    mod_info = self.get_module_info(module_name)
857    return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get(
858        constants.MODULE_CLASS, []
859    )
860
861  def has_mainline_modules(
862      self, module_name: str, mainline_binaries: List[str]
863  ) -> bool:
864    """Check if the mainline modules are in module-info.
865
866    Args:
867        module_name: A string of the module name.
868        mainline_binaries: A list of mainline module binaries.
869
870    Returns:
871        True if mainline_binaries is in module-info, False otherwise.
872    """
873    mod_info = self.get_module_info(module_name)
874    # Check 'test_mainline_modules' attribute of the module-info.json.
875    mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, [])
876    ml_modules_set = set(mainline_binaries)
877    if mm_in_mf:
878      return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf))
879    for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []):
880      # Check the value of 'mainline-param' in the test config.
881      if not self.is_auto_gen_test_config(module_name):
882        return contains_same_mainline_modules(
883            ml_modules_set,
884            atest_utils.get_mainline_param(
885                os.path.join(self.root_dir, test_config)
886            ),
887        )
888      # Unable to verify mainline modules in an auto-gen test config.
889      logging.debug(
890          '%s is associated with an auto-generated test config.', module_name
891      )
892      return True
893    return False
894
895  def get_filepath_from_module(self, module_name: str, filename: str) -> Path:
896    """Return absolute path of the given module and filename."""
897    mod_path = self.get_paths(module_name)
898    if mod_path:
899      return Path(self.root_dir).joinpath(mod_path[0], filename)
900    return Path()
901
902  def get_module_dependency(self, module_name, depend_on=None):
903    """Get the dependency sets for input module.
904
905    Recursively find all the dependencies of the input module.
906
907    Args:
908        module_name: String of module to check.
909        depend_on: The list of parent dependencies.
910
911    Returns:
912        Set of dependency modules.
913    """
914    if not depend_on:
915      depend_on = set()
916    deps = set()
917    mod_info = self.get_module_info(module_name)
918    if not mod_info:
919      return deps
920    mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, []))
921    # Remove item in deps if it already in depend_on:
922    mod_deps = mod_deps - depend_on
923    deps = deps.union(mod_deps)
924    for mod_dep in mod_deps:
925      deps = deps.union(
926          set(
927              self.get_module_dependency(
928                  mod_dep, depend_on=depend_on.union(deps)
929              )
930          )
931      )
932    return deps
933
934  def get_install_module_dependency(self, module_name, depend_on=None):
935    """Get the dependency set for the given modules with installed path.
936
937    Args:
938        module_name: String of module to check.
939        depend_on: The list of parent dependencies.
940
941    Returns:
942        Set of dependency modules which has installed path.
943    """
944    install_deps = set()
945    deps = self.get_module_dependency(module_name, depend_on)
946    logging.debug('%s depends on: %s', module_name, deps)
947    for module in deps:
948      mod_info = self.get_module_info(module)
949      if mod_info and mod_info.get(constants.MODULE_INSTALLED, []):
950        install_deps.add(module)
951    logging.debug(
952        'modules %s required by %s were not installed',
953        install_deps,
954        module_name,
955    )
956    return install_deps
957
958  @staticmethod
959  def is_unit_test(mod_info):
960    """Return True if input module is unit test, False otherwise.
961
962    Args:
963        mod_info: ModuleInfo to check.
964
965    Returns:
966        True if input module is unit test, False otherwise.
967    """
968    return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true'
969
970  def is_host_unit_test(self, info: Dict[str, Any]) -> bool:
971    """Return True if input module is host unit test, False otherwise.
972
973    Args:
974        info: ModuleInfo to check.
975
976    Returns:
977        True if input module is host unit test, False otherwise.
978    """
979    return self.is_tradefed_testable_module(
980        info
981    ) and self.is_suite_in_compatibility_suites('host-unit-tests', info)
982
983  def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool:
984    """Return whether 'robolectric-tests' is in 'compatibility_suites'."""
985    return self.is_tradefed_testable_module(
986        info
987    ) and self.is_robolectric_test_suite(info)
988
989  def is_robolectric_test_suite(self, mod_info) -> bool:
990    """Return True if 'robolectric-tests' in the compatibility_suites.
991
992    Args:
993        mod_info: ModuleInfo to check.
994
995    Returns:
996        True if the 'robolectric-tests' is in the compatibility_suites,
997        False otherwise.
998    """
999    return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info)
1000
1001  def is_ravenwood_test(self, info: Dict[str, Any]) -> bool:
1002    """Return whether 'ravenwood-tests' is in 'compatibility_suites'."""
1003    return self.is_tradefed_testable_module(
1004        info
1005    ) and self.is_ravenwood_test_suite(info)
1006
1007  def is_ravenwood_test_suite(self, mod_info) -> bool:
1008    """Return True if 'ravenwood-tests' in the compatibility_suites.
1009
1010    Args:
1011        mod_info: ModuleInfo to check.
1012
1013    Returns:
1014        True if the 'ravenwood-tests' is in the compatibility_suites,
1015        False otherwise.
1016    """
1017    return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info)
1018
1019  def is_device_driven_test(self, mod_info):
1020    """Return True if input module is device driven test, False otherwise.
1021
1022    Args:
1023        mod_info: ModuleInfo to check.
1024
1025    Returns:
1026        True if input module is device driven test, False otherwise.
1027    """
1028    if self.is_robolectric_test_suite(mod_info):
1029      return False
1030    if self.is_ravenwood_test_suite(mod_info):
1031      return False
1032
1033    return self.is_tradefed_testable_module(
1034        mod_info
1035    ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1036
1037  def is_host_driven_test(self, mod_info):
1038    """Return True if input module is host driven test, False otherwise.
1039
1040    Args:
1041        mod_info: ModuleInfo to check.
1042
1043    Returns:
1044        True if input module is host driven test, False otherwise.
1045    """
1046    return self.is_tradefed_testable_module(
1047        mod_info
1048    ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1049
1050  def _any_module(self, _: Module) -> bool:
1051    return True
1052
1053  def get_all_tests(self):
1054    """Get a list of all the module names which are tests."""
1055    return self._get_all_modules(type_predicate=self.is_testable_module)
1056
1057  def get_all_unit_tests(self):
1058    """Get a list of all the module names which are unit tests."""
1059    return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test)
1060
1061  def get_all_host_unit_tests(self):
1062    """Get a list of all the module names which are host unit tests."""
1063    return self._get_all_modules(type_predicate=self.is_host_unit_test)
1064
1065  def get_all_device_driven_tests(self):
1066    """Get a list of all the module names which are device driven tests."""
1067    return self._get_all_modules(type_predicate=self.is_device_driven_test)
1068
1069  def _get_all_modules(self, type_predicate=None):
1070    """Get a list of all the module names that passed the predicate."""
1071    modules = []
1072    type_predicate = type_predicate or self._any_module
1073    for mod_name, mod_info in self.name_to_module_info.items():
1074      if mod_info.get(constants.MODULE_NAME, '') == mod_name:
1075        if type_predicate(mod_info):
1076          modules.append(mod_name)
1077    return modules
1078
1079  def get_modules_by_path_in_srcs(
1080      self, path: str, testable_modules_only: bool = False
1081  ) -> Set[str]:
1082    """Get the module name that the given path belongs to.(in 'srcs')
1083
1084    Args:
1085        path: file path which is relative to ANDROID_BUILD_TOP.
1086        testable_modules_only: boolean flag which determines whether search
1087          testable modules only or not.
1088
1089    Returns:
1090        A set of string for matched module names, empty set if nothing find.
1091    """
1092    modules = set()
1093
1094    for mod_name in (
1095        self.get_testable_modules()
1096        if testable_modules_only
1097        else self.name_to_module_info.keys()
1098    ):
1099      m_info = self.get_module_info(mod_name)
1100      if m_info:
1101        for src in m_info.get(constants.MODULE_SRCS, []):
1102          if src in path:
1103            modules.add(mod_name)
1104
1105    return modules
1106
1107  def get_modules_by_path(self, path: str, testable_modules_only: bool = False):
1108    """Get the module names that the give path belongs to.
1109
1110    Args:
1111        path: dir path for searching among `path` in module information.
1112        testable_modules_only: boolean flag which determines whether search
1113          testable modules only or not.
1114    """
1115    modules = set()
1116    is_testable_module_fn = (
1117        self.is_testable_module if testable_modules_only else lambda _: True
1118    )
1119
1120    m_infos = self.path_to_module_info.get(path)
1121    if m_infos:
1122      modules = {
1123          info.get(constants.MODULE_NAME)
1124          for info in m_infos
1125          if is_testable_module_fn(info)
1126      }
1127
1128    return modules
1129
1130  def get_modules_by_include_deps(
1131      self, deps: Set[str], testable_module_only: bool = False
1132  ) -> Set[str]:
1133    """Get the matched module names for the input dependencies.
1134
1135    Args:
1136        deps: A set of string for dependencies.
1137        testable_module_only: Option if only want to get testable module.
1138
1139    Returns:
1140        A set of matched module names for the input dependencies.
1141    """
1142    modules = set()
1143
1144    for mod_name in (
1145        self.get_testable_modules()
1146        if testable_module_only
1147        else self.name_to_module_info.keys()
1148    ):
1149      mod_info = self.get_module_info(mod_name)
1150      if mod_info and deps.intersection(
1151          set(mod_info.get(constants.MODULE_DEPENDENCIES, []))
1152      ):
1153        modules.add(mod_info.get(constants.MODULE_NAME))
1154    return modules
1155
1156  def get_installed_paths(self, module_name: str) -> List[Path]:
1157    """Return installed path from module info."""
1158    mod_info = self.get_module_info(module_name)
1159    if not mod_info:
1160      return []
1161
1162    def _to_abs_path(p):
1163      if os.path.isabs(p):
1164        return Path(p)
1165      return Path(os.getenv(constants.ANDROID_BUILD_TOP), p)
1166
1167    return [_to_abs_path(p) for p in mod_info.get('installed', [])]
1168
1169  def get_code_under_test(self, module_name: str) -> List[str]:
1170    """Return code under test from module info."""
1171    mod_info = self.get_module_info(module_name)
1172    if not mod_info:
1173      atest_utils.colorful_print(
1174          '\nmodule %s cannot be found in module info, skip generating'
1175          ' coverage for it.' % module_name,
1176          constants.YELLOW,
1177      )
1178      return []
1179
1180    return mod_info.get('code_under_test', [])
1181
1182  def build_variants(self, info: Dict[str, Any]) -> List[str]:
1183    return info.get(constants.MODULE_SUPPORTED_VARIANTS, [])
1184
1185  def requires_device(self, info: Dict[str, Any]) -> bool:
1186
1187    if self.is_modern_robolectric_test(info):
1188      return False
1189    if self.is_ravenwood_test(info):
1190      return False
1191    if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants(
1192        info
1193    ):
1194      return False
1195
1196    return True
1197
1198
1199def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path):
1200  """Create a Sqlite DB by writing to tempfile and move it to the right place.
1201
1202  Args:
1203      data_map: A dict where the key is table name and value is data itself.
1204      db_path: A Path pointing to the DB file.
1205  """
1206  if db_path.is_file():
1207    db_path.unlink()
1208
1209  with tempfile.NamedTemporaryFile(delete=False) as tmp_db:
1210    _create_db_in_path(data_map, tmp_db.name)
1211    shutil.move(tmp_db.name, db_path)
1212
1213    logging.debug('%s is created successfully.', db_path)
1214
1215
1216def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path):
1217  """Create a Sqlite DB with multiple tables.
1218
1219  Args:
1220      data_map: A dict where the key is table name and value is data itself.
1221      db_path: A Path pointing to the DB file.
1222  """
1223  con = sqlite3.connect(db_path)
1224  with con:
1225    cur = con.cursor()
1226    for table, contents in data_map.items():
1227      cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)')
1228
1229      data = []
1230      for k, v in contents.items():
1231        data.append({'key': k, 'value': json.dumps(v)})
1232      cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data)
1233
1234
1235def _create_json(data_map: Dict[str, Any], json_path: Path):
1236  """Write content onto a JSON file.
1237
1238  Args:
1239      data_map: A dict where the key is table name and value is data itself.
1240      json_path: A Path pointing to the JSON file.
1241  """
1242  if json_path.is_file():
1243    json_path.unlink()
1244
1245  with tempfile.NamedTemporaryFile(delete=False) as temp_json:
1246    with open(temp_json.name, 'w', encoding='utf-8') as _temp:
1247      json.dump(data_map, _temp, indent=0)
1248    shutil.move(temp_json.name, json_path)
1249
1250    logging.debug('%s is created successfully.', json_path)
1251
1252
1253def _save_data_async(function: Callable, contents: Any, target_path: Path):
1254  """Save contents to a static file in asynchronized manner."""
1255  atest_utils.run_multi_proc(
1256      func=function,
1257      args=[contents, target_path],
1258      # We set `daemon` to `False` to make sure that Atest doesn't exit before
1259      # writing the cache file.
1260      daemon=False,
1261  )
1262
1263
1264def merge_soong_info(name_to_module_info, mod_bp_infos):
1265  """Merge the dependency and srcs in mod_bp_infos to name_to_module_info.
1266
1267  Args:
1268      name_to_module_info: Dict of module name to module info dict.
1269      mod_bp_infos: Dict of module name to bp's module info dict.
1270
1271  Returns:
1272      Dict of updated name_to_module_info.
1273  """
1274  merge_items = [
1275      constants.MODULE_DEPENDENCIES,
1276      constants.MODULE_SRCS,
1277      constants.MODULE_LIBS,
1278      constants.MODULE_STATIC_LIBS,
1279      constants.MODULE_STATIC_DEPS,
1280      constants.MODULE_PATH,
1281  ]
1282  for module_name, dep_info in mod_bp_infos.items():
1283    mod_info = name_to_module_info.setdefault(module_name, {})
1284    for merge_item in merge_items:
1285      dep_info_values = dep_info.get(merge_item, [])
1286      mod_info_values = mod_info.get(merge_item, [])
1287      mod_info_values.extend(dep_info_values)
1288      mod_info_values.sort()
1289      # deduplicate values just in case.
1290      mod_info_values = list(dict.fromkeys(mod_info_values))
1291      name_to_module_info[module_name][merge_item] = mod_info_values
1292  return name_to_module_info
1293
1294
1295def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]):
1296  missing_modules = {}
1297
1298  # Android's build system automatically adds a suffix for some build module
1299  # variants. For example, a module-info entry for a module originally named
1300  # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would
1301  # not be able to find. We add such entries if not already present so they
1302  # can be looked up using their declared module name.
1303  for mod_name, mod_info in name_to_module_info.items():
1304    declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name)
1305    if declared_module_name in name_to_module_info:
1306      continue
1307    missing_modules.setdefault(declared_module_name, mod_info)
1308
1309  name_to_module_info.update(missing_modules)
1310
1311
1312def contains_same_mainline_modules(
1313    mainline_modules: Set[str], module_lists: Set[str]
1314):
1315  """Check if mainline modules listed on command line is
1316
1317  the same set as config.
1318
1319  Args:
1320      mainline_modules: A list of mainline modules from triggered test.
1321      module_lists: A list of concatenate mainline module string from test
1322        configs.
1323
1324  Returns
1325      True if the set mainline modules from triggered test is in the test
1326        configs.
1327  """
1328  for module_string in module_lists:
1329    if mainline_modules == set(module_string.split('+')):
1330      return True
1331  return False
1332
1333
1334def get_path_to_module_info(name_to_module_info):
1335  """Return the path_to_module_info dict.
1336
1337  Args:
1338      name_to_module_info: Dict of module name to module info dict.
1339
1340  Returns:
1341      Dict of module path to module info dict.
1342  """
1343  path_to_module_info = {}
1344  for mod_name, mod_info in name_to_module_info.items():
1345    # Cross-compiled and multi-arch modules actually all belong to
1346    # a single target so filter out these extra modules.
1347    if mod_name != mod_info.get(constants.MODULE_NAME, ''):
1348      continue
1349    for path in mod_info.get(constants.MODULE_PATH, []):
1350      mod_info[constants.MODULE_NAME] = mod_name
1351      # There could be multiple modules in a path.
1352      if path in path_to_module_info:
1353        path_to_module_info[path].append(mod_info)
1354      else:
1355        path_to_module_info[path] = [mod_info]
1356  return path_to_module_info
1357
1358
1359def _get_module_names(path_to_module_info, rel_module_path):
1360  """Get the modules that all have module_path.
1361
1362  Args:
1363      path_to_module_info: Dict of path to module info.
1364      rel_module_path: path of module in module-info.json.
1365
1366  Returns:
1367      List of module names.
1368  """
1369  return [
1370      m.get(constants.MODULE_NAME)
1371      for m in path_to_module_info.get(rel_module_path, [])
1372  ]
1373
1374
1375def _get_robolectric_test_name(
1376    name_to_module_info: Dict[str, Dict],
1377    path_to_module_info: Dict[str, Dict],
1378    info: Dict[str, Any],
1379) -> str:
1380  """Returns runnable robolectric module name.
1381
1382  This method is for legacy robolectric tests and returns one of associated
1383  modules. The pattern is determined by the amount of shards:
1384
1385  10 shards:
1386      FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9
1387  No shard:
1388      FooTests -> RunFooTests
1389
1390  Arg:
1391      name_to_module_info: Dict of name to module info.
1392      path_to_module_info: Dict of path to module info.
1393      info: Dict of module info to check.
1394
1395  Returns:
1396      String of the first-matched associated module that belongs to the
1397      actual robolectric module, None if nothing has been found.
1398  """
1399  if not info:
1400    return ''
1401  module_paths = info.get(constants.MODULE_PATH, [])
1402  if not module_paths:
1403    return ''
1404  filtered_module_names = [
1405      name
1406      for name in _get_module_names(path_to_module_info, module_paths[0])
1407      if name.startswith('Run')
1408  ]
1409  return next(
1410      (
1411          name
1412          for name in filtered_module_names
1413          if ModuleInfo.is_legacy_robolectric_class(
1414              name_to_module_info.get(name)
1415          )
1416      ),
1417      '',
1418  )
1419
1420
1421def _is_legacy_robolectric_test(
1422    name_to_module_info: Dict[str, Dict],
1423    path_to_module_info: Dict[str, Dict],
1424    info: Dict[str, Any],
1425) -> bool:
1426  """Return whether the module_name is a legacy Robolectric test"""
1427  if ModuleInfo.is_tradefed_testable_module(info):
1428    return False
1429  return bool(
1430      _get_robolectric_test_name(name_to_module_info, path_to_module_info, info)
1431  )
1432
1433
1434def get_module_info_target() -> str:
1435  """Get module info target name for soong_ui.bash"""
1436  build_top = atest_utils.get_build_top()
1437  module_info_path = atest_utils.get_product_out(_MODULE_INFO)
1438  if module_info_path.is_relative_to(build_top):
1439    return str(module_info_path.relative_to(build_top))
1440
1441  logging.debug('Found customized OUT_DIR!')
1442  return str(module_info_path)
1443
1444
1445def build():
1446  """Build module-info.json"""
1447  logging.debug(
1448      'Generating %s - this is required for initial runs or forced rebuilds.',
1449      _MODULE_INFO,
1450  )
1451  build_start = time.time()
1452  if not atest_utils.build([get_module_info_target()]):
1453    sys.exit(ExitCode.BUILD_FAILURE)
1454
1455  metrics.LocalDetectEvent(
1456      detect_type=DetectType.ONLY_BUILD_MODULE_INFO,
1457      result=int(time.time() - build_start),
1458  )
1459
1460
1461def _is_testable_module(
1462    name_to_module_info: Dict[str, Dict],
1463    path_to_module_info: Dict[str, Dict],
1464    info: Dict[str, Any],
1465) -> bool:
1466  """Check if module is something we can test.
1467
1468  A module is testable if:
1469    - it's a tradefed testable module, or
1470    - it's a Mobly module, or
1471    - it's a robolectric module (or shares path with one).
1472
1473  Args:
1474      name_to_module_info: Dict of name to module info.
1475      path_to_module_info: Dict of path to module info.
1476      info: Dict of module info to check.
1477
1478  Returns:
1479      True if we can test this module, False otherwise.
1480  """
1481  if not info or not info.get(constants.MODULE_NAME):
1482    return False
1483  if ModuleInfo.is_tradefed_testable_module(info):
1484    return True
1485  if ModuleInfo.is_mobly_module(info):
1486    return True
1487  if _is_legacy_robolectric_test(
1488      name_to_module_info, path_to_module_info, info
1489  ):
1490    return True
1491  return False
1492
1493
1494def _get_testable_modules(
1495    name_to_module_info: Dict[str, Dict],
1496    path_to_module_info: Dict[str, Dict],
1497    suite: str = None,
1498    index_path: Path = None,
1499):
1500  """Return testable modules of the given suite name."""
1501  suite_to_modules = _get_suite_to_modules(
1502      name_to_module_info, path_to_module_info, index_path
1503  )
1504
1505  return _filter_modules_by_suite(suite_to_modules, suite)
1506
1507
1508def _get_suite_to_modules(
1509    name_to_module_info: Dict[str, Dict],
1510    path_to_module_info: Dict[str, Dict],
1511    index_path: Path = None,
1512) -> Dict[str, Set[str]]:
1513  """Map suite and its modules.
1514
1515  Args:
1516      name_to_module_info: Dict of name to module info.
1517      path_to_module_info: Dict of path to module info.
1518      index_path: Path of the stored content.
1519
1520  Returns:
1521      Dict of suite and testable modules mapping.
1522  """
1523  suite_to_modules = {}
1524
1525  for _, info in name_to_module_info.items():
1526    if _is_testable_module(name_to_module_info, path_to_module_info, info):
1527      testable_module = info.get(constants.MODULE_NAME)
1528      suites = (
1529          info.get('compatibility_suites')
1530          if info.get('compatibility_suites')
1531          else ['null-suite']
1532      )
1533
1534      for suite in suites:
1535        suite_to_modules.setdefault(suite, set()).add(testable_module)
1536
1537  if index_path:
1538    _index_testable_modules(suite_to_modules, index_path)
1539
1540  return suite_to_modules
1541
1542
1543def _filter_modules_by_suite(
1544    suite_to_modules: Dict[str, Set[str]],
1545    suite: str = None,
1546) -> Set[str]:
1547  """Return modules of the given suite name."""
1548  if suite:
1549    return suite_to_modules.get(suite)
1550
1551  return {mod for mod_set in suite_to_modules.values() for mod in mod_set}
1552
1553
1554def _index_testable_modules(contents: Any, index_path: Path):
1555  """Dump testable modules.
1556
1557  Args:
1558      content: An object that will be written to the index file.
1559      index_path: Path to the saved index file.
1560  """
1561  logging.debug(
1562      r'Indexing testable modules... '
1563      r'(This is required whenever module-info.json '
1564      r'was rebuilt.)'
1565  )
1566  index_path.parent.mkdir(parents=True, exist_ok=True)
1567  with tempfile.NamedTemporaryFile(delete=False) as cache:
1568    try:
1569      pickle.dump(contents, cache, protocol=2)
1570      shutil.move(cache.name, index_path)
1571      logging.debug('%s is created successfully.', index_path)
1572    except IOError:
1573      atest_utils.print_and_log_error('Failed in dumping %s', cache)
1574      os.remove(cache.name)
1575
1576
1577class SqliteDict(collections.abc.Mapping):
1578  """A class that loads a Sqlite DB as a dictionary-like object.
1579
1580  Args:
1581      conn: A connection to the Sqlite database.
1582      table_name: A string the table name.
1583  """
1584
1585  def __init__(self, conn: sqlite3.Connection, table_name: str):
1586    """Initialize the SqliteDict instance."""
1587    self.conn = conn
1588    self.table = table_name
1589
1590  def __iter__(self) -> str:
1591    """Iterate over the keys in the SqliteDict."""
1592    for key in self._load_key_rows():
1593      yield key[0]
1594
1595  def _load_key_rows(self) -> Set[str]:
1596    """Load the key rows from the database table."""
1597    results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall()
1598    return set(results)
1599
1600  def __len__(self) -> int:
1601    """Get the size of key-value pairs in the SqliteDict."""
1602    return len(self._load_key_rows())
1603
1604  def __getitem__(self, key) -> Dict[str, Any]:
1605    """Get the value associated with the specified key."""
1606    result = self.conn.execute(
1607        f'SELECT value FROM {self.table} WHERE key = ?', (key,)
1608    ).fetchone()
1609    if result:
1610      return json.loads(result[0])
1611    raise KeyError(f'Bad key: {key}')
1612
1613  def items(self) -> Tuple[str, Dict[str, Any]]:
1614    """Iterate over the key-value pairs in the SqliteDict."""
1615    for key in self:
1616      value = self[key]
1617      yield key, value
1618