1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Module Info class used to hold cached module-info.json.""" 16 17# pylint: disable=too-many-lines 18from __future__ import annotations 19 20import collections 21import json 22import logging 23import os 24from pathlib import Path 25import pickle 26import re 27import shutil 28import sqlite3 29import sys 30import tempfile 31import time 32from typing import Any, Callable, Dict, List, Set, Tuple 33 34from atest import atest_utils 35from atest import constants 36from atest.atest_enum import DetectType, ExitCode 37from atest.metrics import metrics 38 39 40# JSON file generated by build system that lists all buildable targets. 41_MODULE_INFO = 'module-info.json' 42# JSON file generated by build system that lists dependencies for java. 43_JAVA_DEP_INFO = 'module_bp_java_deps.json' 44# JSON file generated by build system that lists dependencies for cc. 45_CC_DEP_INFO = 'module_bp_cc_deps.json' 46# JSON file generated by atest merged the content from module-info, 47# module_bp_java_deps.json, and module_bp_cc_deps. 48_MERGED_INFO = 'atest_merged_dep.json' 49_DB_VERSION = 2 50_DB_NAME = f'module-info.{_DB_VERSION}.db' 51_NAME_MODULE_TABLE = 'modules' 52_PATH_MODULE_TABLE = 'path_modules' 53 54 55Module = Dict[str, Any] 56 57 58def load_from_file( 59 module_file: Path = None, 60 force_build: bool = False, 61) -> ModuleInfo: 62 """Factory method that initializes ModuleInfo from the build-generated 63 64 JSON file 65 """ 66 loader = Loader( 67 module_file=module_file, 68 force_build=force_build, 69 need_merge_fn=lambda: False, 70 ) 71 72 mi = loader.load() 73 74 return mi 75 76 77def load_from_dict(name_to_module_info: Dict[str, Any]) -> ModuleInfo: 78 """Factory method that initializes ModuleInfo from a dictionary.""" 79 path_to_module_info = get_path_to_module_info(name_to_module_info) 80 return ModuleInfo( 81 name_to_module_info=name_to_module_info, 82 path_to_module_info=path_to_module_info, 83 get_testable_modules=lambda s: _get_testable_modules( 84 name_to_module_info, path_to_module_info, s 85 ), 86 ) 87 88 89def create_empty() -> ModuleInfo: 90 """Factory method that initializes an empty ModuleInfo.""" 91 return ModuleInfo() 92 93 94def load( 95 force_build: bool = False, sqlite_module_cache: bool = False 96) -> ModuleInfo: 97 """Factory method that initializes ModuleInfo from the build-generated 98 99 JSON or Sqlite file. 100 """ 101 mod_start = time.time() 102 loader = Loader( 103 force_build=force_build, sqlite_module_cache=sqlite_module_cache 104 ) 105 mod_stop = time.time() - mod_start 106 metrics.LocalDetectEvent( 107 detect_type=DetectType.MODULE_INFO_INIT_MS, result=int(mod_stop * 1000) 108 ) 109 110 return loader.load(save_timestamps=True) 111 112 113def metrics_timer(func): 114 """Decorator method for sending data to metrics.""" 115 116 def wrapper(*args, **kwargs): 117 start = time.time() 118 result = func(*args, **kwargs) 119 elapsed_time = int(time.time() - start) 120 metrics.LocalDetectEvent( 121 detect_type=DetectType.TESTABLE_MODULES, result=elapsed_time 122 ) 123 return result 124 125 return wrapper 126 127 128class Loader: 129 """Class that handles load and merge processes.""" 130 131 def __init__( 132 self, 133 module_file: Path = None, 134 force_build: bool = False, 135 sqlite_module_cache: bool = False, 136 need_merge_fn: Callable = None, 137 ): 138 logging.debug( 139 'Creating module info loader object with module_file: %s, force_build:' 140 ' %s, sqlite_module_cache: %s, need_merge_fn: %s', 141 module_file, 142 force_build, 143 sqlite_module_cache, 144 need_merge_fn, 145 ) 146 self.java_dep_path = atest_utils.get_build_out_dir('soong', _JAVA_DEP_INFO) 147 self.cc_dep_path = atest_utils.get_build_out_dir('soong', _CC_DEP_INFO) 148 self.merged_dep_path = atest_utils.get_product_out(_MERGED_INFO) 149 logging.debug( 150 'java_dep_path: %s, cc_dep_path: %s, merged_dep_path: %s', 151 self.java_dep_path, 152 self.cc_dep_path, 153 self.merged_dep_path, 154 ) 155 156 self.sqlite_module_cache = sqlite_module_cache 157 logging.debug('sqlite_module_cache: %s', sqlite_module_cache) 158 if self.sqlite_module_cache: 159 self.cache_file = atest_utils.get_product_out(_DB_NAME) 160 self.save_cache_async = self._save_db_async 161 self.load_from_cache = self._load_from_db 162 else: 163 self.cache_file = self.merged_dep_path 164 self.save_cache_async = self._save_json_async 165 self.load_from_cache = self._load_from_json 166 167 if need_merge_fn: 168 self.save_cache_async = lambda _, __: None 169 170 self.update_merge_info = False 171 self.module_index = atest_utils.get_index_path( 172 f'suite-modules.{_DB_VERSION}.idx' 173 ) 174 self.module_index_proc = None 175 logging.debug('module_index: %s', self.module_index) 176 177 if module_file: 178 self.mod_info_file_path = Path(module_file) 179 self.load_module_info = self._load_module_info_from_file_wo_merging 180 else: 181 self.mod_info_file_path = atest_utils.get_product_out(_MODULE_INFO) 182 if force_build: 183 logging.debug('Triggering module info build by force build.') 184 build() 185 elif not self.mod_info_file_path.is_file(): 186 logging.debug( 187 'Triggering module info build due to module info file path %s not' 188 ' exist.', 189 self.mod_info_file_path, 190 ) 191 build() 192 193 self.update_merge_info = self.need_merge_module_info() 194 self.load_module_info = self._load_module_info_file 195 196 logging.debug( 197 'Executing load_module_info function %s', self.load_module_info 198 ) 199 self.name_to_module_info, self.path_to_module_info = self.load_module_info() 200 201 logging.debug('Completed creating module info loader object') 202 203 def load(self, save_timestamps: bool = False): 204 logging.debug('Loading ModuleInfo. save_timestamps: %s', save_timestamps) 205 if save_timestamps: 206 atest_utils.run_multi_proc(func=atest_utils.save_build_files_timestamp) 207 208 return ModuleInfo( 209 name_to_module_info=self.name_to_module_info, 210 path_to_module_info=self.path_to_module_info, 211 mod_info_file_path=self.mod_info_file_path, 212 get_testable_modules=self.get_testable_modules, 213 ) 214 215 def _load_module_info_file(self): 216 """Load module-info.json file as ModuleInfo and merge related JSON files 217 218 whenever required. 219 220 Returns: 221 Dict of module name to module info and dict of module path to module 222 info. 223 """ 224 # +--------------+ +----------------------------------+ 225 # | ModuleInfo() | | ModuleInfo(module_file=foo.json) | 226 # +-------+------+ +----------------+-----------------+ 227 # | module_info.build() | load 228 # v V 229 # +--------------------------+ +--------------------------+ 230 # | module-info.json | | foo.json | 231 # | module_bp_cc_deps.json | | module_bp_cc_deps.json | 232 # | module_bp_java_deps.json | | module_bp_java_deps.json | 233 # +--------------------------+ +--------------------------+ 234 # | | 235 # | _merge_soong_info() <--------------------+ 236 # v 237 # +============================+ 238 # | $ANDROID_PRODUCT_OUT | 239 # | /atest_merged_dep.json |--> load as module info. 240 # +============================+ 241 if not self.update_merge_info: 242 return self.load_from_cache() 243 244 name_modules, path_modules = self._load_from_json(merge=True) 245 self.save_cache_async(name_modules, path_modules) 246 self._save_testable_modules_async(name_modules, path_modules) 247 248 return name_modules, path_modules 249 250 def _load_module_info_from_file_wo_merging(self): 251 """Load module-info.json as ModuleInfo without merging.""" 252 name_modules = atest_utils.load_json_safely(self.mod_info_file_path) 253 _add_missing_variant_modules(name_modules) 254 255 return name_modules, get_path_to_module_info(name_modules) 256 257 def _save_db_async( 258 self, 259 name_to_module_info: Dict[str, Any], 260 path_to_module_info: Dict[str, Any], 261 ): 262 """Save data to a Sqlite database in parallel.""" 263 data_map = { 264 _NAME_MODULE_TABLE: name_to_module_info, 265 _PATH_MODULE_TABLE: path_to_module_info, 266 } 267 _save_data_async( 268 function=_create_db, 269 contents=data_map, 270 target_path=self.cache_file, 271 ) 272 273 def _load_from_db(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: 274 """Return a tuple of dicts by from SqliteDict.""" 275 conn = sqlite3.connect(self.cache_file) 276 with conn: 277 name_to_module_info = SqliteDict(conn, _NAME_MODULE_TABLE) 278 path_to_module_info = SqliteDict(conn, _PATH_MODULE_TABLE) 279 280 return name_to_module_info, path_to_module_info 281 282 def _save_json_async(self, name_to_module_info: Dict[str, Any], _): 283 """Save data to a JSON format in parallel.""" 284 _save_data_async( 285 function=_create_json, 286 contents=name_to_module_info, 287 target_path=self.cache_file, 288 ) 289 290 def _load_from_json(self, merge: bool = False) -> Tuple[Dict, Dict]: 291 """Load or merge module info from json file. 292 293 Args: 294 merge: Boolean whether to merge build system infos. 295 296 Returns: 297 A tuple of (name_to_module_info, path_to_module_info). 298 """ 299 start = time.time() 300 if merge: 301 name_info = self._merge_build_system_infos( 302 atest_utils.load_json_safely(self.mod_info_file_path) 303 ) 304 duration = time.time() - start 305 logging.debug('Merging module info took %ss', duration) 306 metrics.LocalDetectEvent( 307 detect_type=DetectType.MODULE_MERGE_MS, result=int(duration * 1000) 308 ) 309 310 return name_info, get_path_to_module_info(name_info) 311 312 name_info = atest_utils.load_json_safely(self.merged_dep_path) 313 duration = time.time() - start 314 logging.debug('Loading module info took %ss', duration) 315 metrics.LocalDetectEvent( 316 detect_type=DetectType.MODULE_LOAD_MS, result=int(duration * 1000) 317 ) 318 logging.debug('Loading %s as module-info.', self.merged_dep_path) 319 320 return name_info, get_path_to_module_info(name_info) 321 322 def _save_testable_modules_async( 323 self, 324 name_to_module_info: Dict[str, Any], 325 path_to_module_info: Dict[str, Any], 326 ): 327 """Save testable modules in parallel.""" 328 return atest_utils.run_multi_proc( 329 func=_get_testable_modules, 330 kwargs={ 331 'name_to_module_info': name_to_module_info, 332 'path_to_module_info': path_to_module_info, 333 'index_path': self.module_index, 334 }, 335 ) 336 337 def need_merge_module_info(self): 338 """Check if needed to regenerate the cache file. 339 340 If the cache file is non-existent or testable module index is inexistent 341 or older than any of the JSON files used to generate it, the cache file 342 must re-generate. 343 344 Returns: 345 True when the cache file is older or non-existent, False otherwise. 346 """ 347 if not self.cache_file.is_file(): 348 return True 349 350 if not self.module_index.is_file(): 351 return True 352 353 # The dependency input files should be generated at this point. 354 return any( 355 self.cache_file.stat().st_mtime < f.stat().st_mtime 356 for f in (self.mod_info_file_path, self.java_dep_path, self.cc_dep_path) 357 ) 358 359 def _merge_build_system_infos( 360 self, name_to_module_info, java_bp_info_path=None, cc_bp_info_path=None 361 ): 362 """Merge the content of module-info.json and CC/Java dependency files 363 364 to name_to_module_info. 365 366 Args: 367 name_to_module_info: Dict of module name to module info dict. 368 java_bp_info_path: String of path to java dep file to load up. Used for 369 testing. 370 cc_bp_info_path: String of path to cc dep file to load up. Used for 371 testing. 372 373 Returns: 374 Dict of updated name_to_module_info. 375 """ 376 # Merge _JAVA_DEP_INFO 377 if not java_bp_info_path: 378 java_bp_info_path = self.java_dep_path 379 java_bp_infos = atest_utils.load_json_safely(java_bp_info_path) 380 if java_bp_infos: 381 logging.debug('Merging Java build info: %s', java_bp_info_path) 382 name_to_module_info = merge_soong_info(name_to_module_info, java_bp_infos) 383 # Merge _CC_DEP_INFO 384 if not cc_bp_info_path: 385 cc_bp_info_path = self.cc_dep_path 386 cc_bp_infos = atest_utils.load_json_safely(cc_bp_info_path) 387 if cc_bp_infos: 388 logging.debug('Merging CC build info: %s', cc_bp_info_path) 389 # CC's dep json format is different with java. 390 # Below is the example content: 391 # { 392 # "clang": "${ANDROID_ROOT}/bin/clang", 393 # "clang++": "${ANDROID_ROOT}/bin/clang++", 394 # "modules": { 395 # "ACameraNdkVendorTest": { 396 # "path": [ 397 # "frameworks/av/camera/ndk" 398 # ], 399 # "srcs": [ 400 # "frameworks/tests/AImageVendorTest.cpp", 401 # "frameworks/tests/ACameraManagerTest.cpp" 402 # ], 403 name_to_module_info = merge_soong_info( 404 name_to_module_info, cc_bp_infos.get('modules', {}) 405 ) 406 # If $ANDROID_PRODUCT_OUT was not created in pyfakefs, simply return it 407 # without dumping atest_merged_dep.json in real. 408 409 # Adds the key into module info as a unique ID. 410 for key, info in name_to_module_info.items(): 411 info[constants.MODULE_INFO_ID] = key 412 413 _add_missing_variant_modules(name_to_module_info) 414 415 return name_to_module_info 416 417 @metrics_timer 418 def get_testable_modules(self, suite=None): 419 """Return the testable modules of the given suite name. 420 421 Atest does not index testable modules against compatibility_suites. When 422 suite was given, or the index file was interrupted, always run 423 _get_testable_modules() and re-index. 424 425 Args: 426 suite: A string of suite name. 427 428 Returns: 429 If suite is not given, return all the testable modules in module 430 info, otherwise return only modules that belong to the suite. 431 """ 432 modules = set() 433 434 if self.module_index.is_file(): 435 modules = self.get_testable_modules_from_index(suite) 436 # If the modules.idx does not exist or invalid for any reason, generate 437 # a new one arbitrarily. 438 if not modules: 439 modules = self.get_testable_module_from_memory(suite) 440 441 return modules 442 443 def get_testable_modules_from_index(self, suite: str = None) -> Set[str]: 444 """Return the testable modules of the given suite name.""" 445 suite_to_modules = {} 446 with open(self.module_index, 'rb') as cache: 447 try: 448 suite_to_modules = pickle.load(cache, encoding='utf-8') 449 except UnicodeDecodeError: 450 suite_to_modules = pickle.load(cache) 451 # when module indexing was interrupted. 452 except EOFError: 453 pass 454 455 return _filter_modules_by_suite(suite_to_modules, suite) 456 457 def get_testable_module_from_memory(self, suite: str = None) -> Set[str]: 458 """Return the testable modules of the given suite name.""" 459 return _get_testable_modules( 460 name_to_module_info=self.name_to_module_info, 461 path_to_module_info=self.path_to_module_info, 462 index_path=self.module_index, 463 suite=suite, 464 ) 465 466 467class ModuleInfo: 468 """Class that offers fast/easy lookup for Module related details.""" 469 470 def __init__( 471 self, 472 name_to_module_info: Dict[str, Any] = None, 473 path_to_module_info: Dict[str, Any] = None, 474 mod_info_file_path: Path = None, 475 get_testable_modules: Callable = None, 476 ): 477 """Initialize the ModuleInfo object. 478 479 Load up the module-info.json file and initialize the helper vars. 480 Note that module-info.json does not contain all module dependencies, 481 therefore, Atest needs to accumulate dependencies defined in bp files. 482 483 Args: 484 name_to_module_info: Dict of name to module info. 485 path_to_module_info: Dict of path to module info. 486 mod_info_file_path: Path of module-info.json. 487 get_testable_modules: Function to get all testable modules. 488 """ 489 # +----------------------+ +----------------------------+ 490 # | $ANDROID_PRODUCT_OUT | |$ANDROID_BUILD_TOP/out/soong| 491 # | /module-info.json | | /module_bp_java_deps.json | 492 # +-----------+----------+ +-------------+--------------+ 493 # | _merge_soong_info() | 494 # +------------------------------+ 495 # | 496 # v 497 # +----------------------------+ +----------------------------+ 498 # |tempfile.NamedTemporaryFile | |$ANDROID_BUILD_TOP/out/soong| 499 # +-------------+--------------+ | /module_bp_cc_deps.json | 500 # | +-------------+--------------+ 501 # | _merge_soong_info() | 502 # +-------------------------------+ 503 # | 504 # +-------| 505 # v 506 # +============================+ 507 # | $ANDROID_PRODUCT_OUT | 508 # | /atest_merged_dep.json |--> load as module info. 509 # +============================+ 510 self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP) 511 512 self.name_to_module_info = name_to_module_info or {} 513 self.path_to_module_info = path_to_module_info or {} 514 self.mod_info_file_path = mod_info_file_path 515 self._get_testable_modules = get_testable_modules 516 517 def is_module(self, name): 518 """Return True if name is a module, False otherwise.""" 519 info = self.get_module_info(name) 520 # From aosp/2293302 it started merging all modules' dependency in bp 521 # even the module is not be exposed to make, and those modules could not 522 # be treated as a build target using m. Only treat input name as module 523 # if it also has the module_name attribute which means it could be a 524 # build target for m. 525 if info and info.get(constants.MODULE_NAME): 526 return True 527 return False 528 529 def get_paths(self, name) -> list[str]: 530 """Return paths of supplied module name, Empty list if non-existent.""" 531 info = self.get_module_info(name) 532 if info: 533 return info.get(constants.MODULE_PATH, []) 534 return [] 535 536 def get_module_names(self, rel_module_path): 537 """Get the modules that all have module_path. 538 539 Args: 540 rel_module_path: path of module in module-info.json 541 542 Returns: 543 List of module names. 544 """ 545 return _get_module_names(self.path_to_module_info, rel_module_path) 546 547 def get_module_info(self, mod_name): 548 """Return dict of info for given module name, None if non-existence.""" 549 return self.name_to_module_info.get(mod_name) 550 551 @staticmethod 552 def is_suite_in_compatibility_suites(suite, mod_info): 553 """Check if suite exists in the compatibility_suites of module-info. 554 555 Args: 556 suite: A string of suite name. 557 mod_info: Dict of module info to check. 558 559 Returns: 560 True if it exists in mod_info, False otherwise. 561 """ 562 if not isinstance(mod_info, dict): 563 return False 564 return suite in mod_info.get(constants.MODULE_COMPATIBILITY_SUITES, []) 565 566 def get_testable_modules(self, suite=None): 567 return self._get_testable_modules(suite) 568 569 @staticmethod 570 def is_tradefed_testable_module(info: Dict[str, Any]) -> bool: 571 """Check whether the module is a Tradefed executable test.""" 572 if not info: 573 return False 574 if not info.get(constants.MODULE_INSTALLED, []): 575 return False 576 return ModuleInfo.has_test_config(info) 577 578 @staticmethod 579 def is_mobly_module(info: Dict[str, Any]) -> bool: 580 """Check whether the module is a Mobly test. 581 582 Note: Only python_test_host modules marked with a test_options tag of 583 "mobly" is considered a Mobly module. 584 585 Args: 586 info: Dict of module info to check. 587 588 Returns: 589 True if this is a Mobly test module, False otherwise. 590 """ 591 return constants.MOBLY_TEST_OPTIONS_TAG in info.get( 592 constants.MODULE_TEST_OPTIONS_TAGS, [] 593 ) 594 595 def is_testable_module(self, info: Dict[str, Any]) -> bool: 596 """Check if module is something we can test. 597 598 A module is testable if: 599 - it's a tradefed testable module, or 600 - it's a Mobly module, or 601 - it's a robolectric module (or shares path with one). 602 603 Args: 604 info: Dict of module info to check. 605 606 Returns: 607 True if we can test this module, False otherwise. 608 """ 609 return _is_testable_module( 610 self.name_to_module_info, self.path_to_module_info, info 611 ) 612 613 @staticmethod 614 def has_test_config(info: Dict[str, Any]) -> bool: 615 """Validate if this module has a test config. 616 617 A module can have a test config in the following manner: 618 - test_config be set in module-info.json. 619 - Auto-generated config via the auto_test_config key 620 in module-info.json. 621 622 Args: 623 info: Dict of module info to check. 624 625 Returns: 626 True if this module has a test config, False otherwise. 627 """ 628 return bool( 629 info.get(constants.MODULE_TEST_CONFIG, []) 630 or info.get('auto_test_config', []) 631 ) 632 633 def is_legacy_robolectric_test(self, info: Dict[str, Any]) -> bool: 634 """Return whether the module_name is a legacy Robolectric test""" 635 return _is_legacy_robolectric_test( 636 self.name_to_module_info, self.path_to_module_info, info 637 ) 638 639 def get_robolectric_test_name(self, info: Dict[str, Any]) -> str: 640 """Returns runnable robolectric module name. 641 642 This method is for legacy robolectric tests and returns one of associated 643 modules. The pattern is determined by the amount of shards: 644 645 10 shards: 646 FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 647 No shard: 648 FooTests -> RunFooTests 649 650 Arg: 651 info: Dict of module info to check. 652 653 Returns: 654 String of the first-matched associated module that belongs to the 655 actual robolectric module, None if nothing has been found. 656 """ 657 return _get_robolectric_test_name( 658 self.name_to_module_info, self.path_to_module_info, info 659 ) 660 661 def is_robolectric_test(self, module_name): 662 """Check if the given module is a robolectric test. 663 664 Args: 665 module_name: String of module to check. 666 667 Returns: 668 Boolean whether it's a robotest or not. 669 """ 670 if self.get_robolectric_type(module_name): 671 return True 672 return False 673 674 def get_robolectric_type(self, module_name: str) -> int: 675 """Check if the given module is a robolectric test and return type of it. 676 677 Robolectric declaration is converting from Android.mk to Android.bp, and 678 in the interim Atest needs to support testing both types of tests. 679 680 The modern robolectric tests defined by 'android_robolectric_test' in an 681 Android.bp file can can be run in Tradefed Test Runner: 682 683 SettingsRoboTests -> Tradefed Test Runner 684 685 Legacy tests defined in an Android.mk can only run with the 'make' way. 686 687 SettingsRoboTests -> make RunSettingsRoboTests0 688 689 To determine whether the test is a modern/legacy robolectric test: 690 1. If the 'robolectric-test` in the compatibility_suites, it's a 691 modern one, otherwise it's a legacy test. This is accurate since 692 aosp/2308586 already set the test suite of `robolectric-test` 693 for all `modern` Robolectric tests in Soong. 694 2. Traverse all modules share the module path. If one of the 695 modules has a ROBOLECTRIC class, it's a legacy robolectric test. 696 697 Args: 698 module_name: String of module to check. 699 700 Returns: 701 0: not a robolectric test. 702 1: a modern robolectric test(defined in Android.bp) 703 2: a legacy robolectric test(defined in Android.mk) 704 """ 705 info = self.get_module_info(module_name) 706 if not info: 707 return 0 708 # Some Modern mode Robolectric test has related module which compliant 709 # with the Legacy Robolectric test. In this case, the Modern mode 710 # Robolectric tests should be prior to the Legacy mode. 711 if self.is_modern_robolectric_test(info): 712 return constants.ROBOTYPE_MODERN 713 if self.is_legacy_robolectric_test(info): 714 return constants.ROBOTYPE_LEGACY 715 return 0 716 717 def get_instrumentation_target_apps(self, module_name: str) -> Dict: 718 """Return target APKs of an instrumentation test. 719 720 Returns: 721 A dict of target module and target APK(s). e.g. 722 {"FooService": {"/path/to/the/FooService.apk"}} 723 """ 724 # 1. Determine the actual manifest filename from an Android.bp(if any) 725 manifest = self.get_filepath_from_module(module_name, 'AndroidManifest.xml') 726 bpfile = self.get_filepath_from_module(module_name, 'Android.bp') 727 if bpfile.is_file(): 728 bp_info = atest_utils.get_bp_content(bpfile, 'android_test') 729 if not bp_info or not bp_info.get(module_name): 730 return {} 731 manifest = self.get_filepath_from_module( 732 module_name, bp_info.get(module_name).get('manifest') 733 ) 734 xml_info = atest_utils.get_manifest_info(manifest) 735 # 2. Translate package name to a module name. 736 package = xml_info.get('package') 737 target_package = xml_info.get('target_package') 738 # Ensure it's an instrumentation test(excluding self-instrmented) 739 if target_package and package != target_package: 740 logging.debug('Found %s an instrumentation test.', module_name) 741 metrics.LocalDetectEvent( 742 detect_type=DetectType.FOUND_INSTRUMENTATION_TEST, result=1 743 ) 744 target_module = self.get_target_module_by_pkg( 745 package=target_package, search_from=manifest.parent 746 ) 747 if target_module: 748 return self.get_artifact_map(target_module) 749 return {} 750 751 # pylint: disable=anomalous-backslash-in-string 752 def get_target_module_by_pkg(self, package: str, search_from: Path) -> str: 753 """Translate package name to the target module name. 754 755 This method is dedicated to determine the target module by translating 756 a package name. 757 758 Phase 1: Find out possible manifest files among parent directories. 759 Phase 2. Look for the defined package fits the given name, and ensure 760 it is not a persistent app. 761 Phase 3: Translate the manifest path to possible modules. A valid module 762 must fulfill: 763 1. The 'class' type must be ['APPS']. 764 2. It is not a Robolectric test. 765 766 Returns: 767 A string of module name. 768 """ 769 xmls = [] 770 for pth in search_from.parents: 771 if pth == Path(self.root_dir): 772 break 773 for name in os.listdir(pth): 774 if pth.joinpath(name).is_file(): 775 match = re.match('.*AndroidManifest.*\.xml$', name) 776 if match: 777 xmls.append(os.path.join(pth, name)) 778 possible_modules = [] 779 for xml in xmls: 780 rel_dir = str(Path(xml).relative_to(self.root_dir).parent) 781 logging.debug('Looking for package "%s" in %s...', package, xml) 782 xml_info = atest_utils.get_manifest_info(xml) 783 if xml_info.get('package') == package: 784 if xml_info.get('persistent'): 785 logging.debug('%s is a persistent app.', package) 786 continue 787 for _m in self.path_to_module_info.get(rel_dir): 788 possible_modules.append(_m) 789 if possible_modules: 790 for mod in possible_modules: 791 name = mod.get('module_name') 792 if mod.get('class') == ['APPS'] and not self.is_robolectric_test(name): 793 return name 794 return '' 795 796 def get_artifact_map(self, module_name: str) -> Dict: 797 """Get the installed APK path of the given module.""" 798 target_mod_info = self.get_module_info(module_name) 799 artifact_map = {} 800 if target_mod_info: 801 apks = set() 802 artifacts = target_mod_info.get('installed') 803 for artifact in artifacts: 804 if Path(artifact).suffix == '.apk': 805 apks.add(os.path.join(self.root_dir, artifact)) 806 artifact_map.update({module_name: apks}) 807 return artifact_map 808 809 def is_auto_gen_test_config(self, module_name): 810 """Check if the test config file will be generated automatically. 811 812 Args: 813 module_name: A string of the module name. 814 815 Returns: 816 True if the test config file will be generated automatically. 817 """ 818 if self.is_module(module_name): 819 mod_info = self.get_module_info(module_name) 820 auto_test_config = mod_info.get('auto_test_config', []) 821 return auto_test_config and auto_test_config[0] 822 return False 823 824 @staticmethod 825 def is_legacy_robolectric_class(info: Dict[str, Any]) -> bool: 826 """Check if the class is `ROBOLECTRIC` 827 828 This method is for legacy robolectric tests that the associated modules 829 contain: 830 'class': ['ROBOLECTRIC'] 831 832 Args: 833 info: ModuleInfo to check. 834 835 Returns: 836 True if the attribute class in mod_info is ROBOLECTRIC, False 837 otherwise. 838 """ 839 if info: 840 module_classes = info.get(constants.MODULE_CLASS, []) 841 return ( 842 module_classes 843 and module_classes[0] == constants.MODULE_CLASS_ROBOLECTRIC 844 ) 845 return False 846 847 def is_native_test(self, module_name): 848 """Check if the input module is a native test. 849 850 Args: 851 module_name: A string of the module name. 852 853 Returns: 854 True if the test is a native test, False otherwise. 855 """ 856 mod_info = self.get_module_info(module_name) 857 return constants.MODULE_CLASS_NATIVE_TESTS in mod_info.get( 858 constants.MODULE_CLASS, [] 859 ) 860 861 def has_mainline_modules( 862 self, module_name: str, mainline_binaries: List[str] 863 ) -> bool: 864 """Check if the mainline modules are in module-info. 865 866 Args: 867 module_name: A string of the module name. 868 mainline_binaries: A list of mainline module binaries. 869 870 Returns: 871 True if mainline_binaries is in module-info, False otherwise. 872 """ 873 mod_info = self.get_module_info(module_name) 874 # Check 'test_mainline_modules' attribute of the module-info.json. 875 mm_in_mf = mod_info.get(constants.MODULE_MAINLINE_MODULES, []) 876 ml_modules_set = set(mainline_binaries) 877 if mm_in_mf: 878 return contains_same_mainline_modules(ml_modules_set, set(mm_in_mf)) 879 for test_config in mod_info.get(constants.MODULE_TEST_CONFIG, []): 880 # Check the value of 'mainline-param' in the test config. 881 if not self.is_auto_gen_test_config(module_name): 882 return contains_same_mainline_modules( 883 ml_modules_set, 884 atest_utils.get_mainline_param( 885 os.path.join(self.root_dir, test_config) 886 ), 887 ) 888 # Unable to verify mainline modules in an auto-gen test config. 889 logging.debug( 890 '%s is associated with an auto-generated test config.', module_name 891 ) 892 return True 893 return False 894 895 def get_filepath_from_module(self, module_name: str, filename: str) -> Path: 896 """Return absolute path of the given module and filename.""" 897 mod_path = self.get_paths(module_name) 898 if mod_path: 899 return Path(self.root_dir).joinpath(mod_path[0], filename) 900 return Path() 901 902 def get_module_dependency(self, module_name, depend_on=None): 903 """Get the dependency sets for input module. 904 905 Recursively find all the dependencies of the input module. 906 907 Args: 908 module_name: String of module to check. 909 depend_on: The list of parent dependencies. 910 911 Returns: 912 Set of dependency modules. 913 """ 914 if not depend_on: 915 depend_on = set() 916 deps = set() 917 mod_info = self.get_module_info(module_name) 918 if not mod_info: 919 return deps 920 mod_deps = set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) 921 # Remove item in deps if it already in depend_on: 922 mod_deps = mod_deps - depend_on 923 deps = deps.union(mod_deps) 924 for mod_dep in mod_deps: 925 deps = deps.union( 926 set( 927 self.get_module_dependency( 928 mod_dep, depend_on=depend_on.union(deps) 929 ) 930 ) 931 ) 932 return deps 933 934 def get_install_module_dependency(self, module_name, depend_on=None): 935 """Get the dependency set for the given modules with installed path. 936 937 Args: 938 module_name: String of module to check. 939 depend_on: The list of parent dependencies. 940 941 Returns: 942 Set of dependency modules which has installed path. 943 """ 944 install_deps = set() 945 deps = self.get_module_dependency(module_name, depend_on) 946 logging.debug('%s depends on: %s', module_name, deps) 947 for module in deps: 948 mod_info = self.get_module_info(module) 949 if mod_info and mod_info.get(constants.MODULE_INSTALLED, []): 950 install_deps.add(module) 951 logging.debug( 952 'modules %s required by %s were not installed', 953 install_deps, 954 module_name, 955 ) 956 return install_deps 957 958 @staticmethod 959 def is_unit_test(mod_info): 960 """Return True if input module is unit test, False otherwise. 961 962 Args: 963 mod_info: ModuleInfo to check. 964 965 Returns: 966 True if input module is unit test, False otherwise. 967 """ 968 return mod_info.get(constants.MODULE_IS_UNIT_TEST, '') == 'true' 969 970 def is_host_unit_test(self, info: Dict[str, Any]) -> bool: 971 """Return True if input module is host unit test, False otherwise. 972 973 Args: 974 info: ModuleInfo to check. 975 976 Returns: 977 True if input module is host unit test, False otherwise. 978 """ 979 return self.is_tradefed_testable_module( 980 info 981 ) and self.is_suite_in_compatibility_suites('host-unit-tests', info) 982 983 def is_modern_robolectric_test(self, info: Dict[str, Any]) -> bool: 984 """Return whether 'robolectric-tests' is in 'compatibility_suites'.""" 985 return self.is_tradefed_testable_module( 986 info 987 ) and self.is_robolectric_test_suite(info) 988 989 def is_robolectric_test_suite(self, mod_info) -> bool: 990 """Return True if 'robolectric-tests' in the compatibility_suites. 991 992 Args: 993 mod_info: ModuleInfo to check. 994 995 Returns: 996 True if the 'robolectric-tests' is in the compatibility_suites, 997 False otherwise. 998 """ 999 return self.is_suite_in_compatibility_suites('robolectric-tests', mod_info) 1000 1001 def is_ravenwood_test(self, info: Dict[str, Any]) -> bool: 1002 """Return whether 'ravenwood-tests' is in 'compatibility_suites'.""" 1003 return self.is_tradefed_testable_module( 1004 info 1005 ) and self.is_ravenwood_test_suite(info) 1006 1007 def is_ravenwood_test_suite(self, mod_info) -> bool: 1008 """Return True if 'ravenwood-tests' in the compatibility_suites. 1009 1010 Args: 1011 mod_info: ModuleInfo to check. 1012 1013 Returns: 1014 True if the 'ravenwood-tests' is in the compatibility_suites, 1015 False otherwise. 1016 """ 1017 return self.is_suite_in_compatibility_suites('ravenwood-tests', mod_info) 1018 1019 def is_device_driven_test(self, mod_info): 1020 """Return True if input module is device driven test, False otherwise. 1021 1022 Args: 1023 mod_info: ModuleInfo to check. 1024 1025 Returns: 1026 True if input module is device driven test, False otherwise. 1027 """ 1028 if self.is_robolectric_test_suite(mod_info): 1029 return False 1030 if self.is_ravenwood_test_suite(mod_info): 1031 return False 1032 1033 return self.is_tradefed_testable_module( 1034 mod_info 1035 ) and 'DEVICE' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1036 1037 def is_host_driven_test(self, mod_info): 1038 """Return True if input module is host driven test, False otherwise. 1039 1040 Args: 1041 mod_info: ModuleInfo to check. 1042 1043 Returns: 1044 True if input module is host driven test, False otherwise. 1045 """ 1046 return self.is_tradefed_testable_module( 1047 mod_info 1048 ) and 'HOST' in mod_info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1049 1050 def _any_module(self, _: Module) -> bool: 1051 return True 1052 1053 def get_all_tests(self): 1054 """Get a list of all the module names which are tests.""" 1055 return self._get_all_modules(type_predicate=self.is_testable_module) 1056 1057 def get_all_unit_tests(self): 1058 """Get a list of all the module names which are unit tests.""" 1059 return self._get_all_modules(type_predicate=ModuleInfo.is_unit_test) 1060 1061 def get_all_host_unit_tests(self): 1062 """Get a list of all the module names which are host unit tests.""" 1063 return self._get_all_modules(type_predicate=self.is_host_unit_test) 1064 1065 def get_all_device_driven_tests(self): 1066 """Get a list of all the module names which are device driven tests.""" 1067 return self._get_all_modules(type_predicate=self.is_device_driven_test) 1068 1069 def _get_all_modules(self, type_predicate=None): 1070 """Get a list of all the module names that passed the predicate.""" 1071 modules = [] 1072 type_predicate = type_predicate or self._any_module 1073 for mod_name, mod_info in self.name_to_module_info.items(): 1074 if mod_info.get(constants.MODULE_NAME, '') == mod_name: 1075 if type_predicate(mod_info): 1076 modules.append(mod_name) 1077 return modules 1078 1079 def get_modules_by_path_in_srcs( 1080 self, path: str, testable_modules_only: bool = False 1081 ) -> Set[str]: 1082 """Get the module name that the given path belongs to.(in 'srcs') 1083 1084 Args: 1085 path: file path which is relative to ANDROID_BUILD_TOP. 1086 testable_modules_only: boolean flag which determines whether search 1087 testable modules only or not. 1088 1089 Returns: 1090 A set of string for matched module names, empty set if nothing find. 1091 """ 1092 modules = set() 1093 1094 for mod_name in ( 1095 self.get_testable_modules() 1096 if testable_modules_only 1097 else self.name_to_module_info.keys() 1098 ): 1099 m_info = self.get_module_info(mod_name) 1100 if m_info: 1101 for src in m_info.get(constants.MODULE_SRCS, []): 1102 if src in path: 1103 modules.add(mod_name) 1104 1105 return modules 1106 1107 def get_modules_by_path(self, path: str, testable_modules_only: bool = False): 1108 """Get the module names that the give path belongs to. 1109 1110 Args: 1111 path: dir path for searching among `path` in module information. 1112 testable_modules_only: boolean flag which determines whether search 1113 testable modules only or not. 1114 """ 1115 modules = set() 1116 is_testable_module_fn = ( 1117 self.is_testable_module if testable_modules_only else lambda _: True 1118 ) 1119 1120 m_infos = self.path_to_module_info.get(path) 1121 if m_infos: 1122 modules = { 1123 info.get(constants.MODULE_NAME) 1124 for info in m_infos 1125 if is_testable_module_fn(info) 1126 } 1127 1128 return modules 1129 1130 def get_modules_by_include_deps( 1131 self, deps: Set[str], testable_module_only: bool = False 1132 ) -> Set[str]: 1133 """Get the matched module names for the input dependencies. 1134 1135 Args: 1136 deps: A set of string for dependencies. 1137 testable_module_only: Option if only want to get testable module. 1138 1139 Returns: 1140 A set of matched module names for the input dependencies. 1141 """ 1142 modules = set() 1143 1144 for mod_name in ( 1145 self.get_testable_modules() 1146 if testable_module_only 1147 else self.name_to_module_info.keys() 1148 ): 1149 mod_info = self.get_module_info(mod_name) 1150 if mod_info and deps.intersection( 1151 set(mod_info.get(constants.MODULE_DEPENDENCIES, [])) 1152 ): 1153 modules.add(mod_info.get(constants.MODULE_NAME)) 1154 return modules 1155 1156 def get_installed_paths(self, module_name: str) -> List[Path]: 1157 """Return installed path from module info.""" 1158 mod_info = self.get_module_info(module_name) 1159 if not mod_info: 1160 return [] 1161 1162 def _to_abs_path(p): 1163 if os.path.isabs(p): 1164 return Path(p) 1165 return Path(os.getenv(constants.ANDROID_BUILD_TOP), p) 1166 1167 return [_to_abs_path(p) for p in mod_info.get('installed', [])] 1168 1169 def get_code_under_test(self, module_name: str) -> List[str]: 1170 """Return code under test from module info.""" 1171 mod_info = self.get_module_info(module_name) 1172 if not mod_info: 1173 atest_utils.colorful_print( 1174 '\nmodule %s cannot be found in module info, skip generating' 1175 ' coverage for it.' % module_name, 1176 constants.YELLOW, 1177 ) 1178 return [] 1179 1180 return mod_info.get('code_under_test', []) 1181 1182 def build_variants(self, info: Dict[str, Any]) -> List[str]: 1183 return info.get(constants.MODULE_SUPPORTED_VARIANTS, []) 1184 1185 def requires_device(self, info: Dict[str, Any]) -> bool: 1186 1187 if self.is_modern_robolectric_test(info): 1188 return False 1189 if self.is_ravenwood_test(info): 1190 return False 1191 if self.is_host_unit_test(info) and 'DEVICE' not in self.build_variants( 1192 info 1193 ): 1194 return False 1195 1196 return True 1197 1198 1199def _create_db(data_map: Dict[str, Dict[str, Any]], db_path: Path): 1200 """Create a Sqlite DB by writing to tempfile and move it to the right place. 1201 1202 Args: 1203 data_map: A dict where the key is table name and value is data itself. 1204 db_path: A Path pointing to the DB file. 1205 """ 1206 if db_path.is_file(): 1207 db_path.unlink() 1208 1209 with tempfile.NamedTemporaryFile(delete=False) as tmp_db: 1210 _create_db_in_path(data_map, tmp_db.name) 1211 shutil.move(tmp_db.name, db_path) 1212 1213 logging.debug('%s is created successfully.', db_path) 1214 1215 1216def _create_db_in_path(data_map: Dict[str, Dict[str, Any]], db_path: Path): 1217 """Create a Sqlite DB with multiple tables. 1218 1219 Args: 1220 data_map: A dict where the key is table name and value is data itself. 1221 db_path: A Path pointing to the DB file. 1222 """ 1223 con = sqlite3.connect(db_path) 1224 with con: 1225 cur = con.cursor() 1226 for table, contents in data_map.items(): 1227 cur.execute(f'CREATE TABLE {table}(key TEXT PRIMARY KEY, value TEXT)') 1228 1229 data = [] 1230 for k, v in contents.items(): 1231 data.append({'key': k, 'value': json.dumps(v)}) 1232 cur.executemany(f'INSERT INTO {table} VALUES(:key, :value)', data) 1233 1234 1235def _create_json(data_map: Dict[str, Any], json_path: Path): 1236 """Write content onto a JSON file. 1237 1238 Args: 1239 data_map: A dict where the key is table name and value is data itself. 1240 json_path: A Path pointing to the JSON file. 1241 """ 1242 if json_path.is_file(): 1243 json_path.unlink() 1244 1245 with tempfile.NamedTemporaryFile(delete=False) as temp_json: 1246 with open(temp_json.name, 'w', encoding='utf-8') as _temp: 1247 json.dump(data_map, _temp, indent=0) 1248 shutil.move(temp_json.name, json_path) 1249 1250 logging.debug('%s is created successfully.', json_path) 1251 1252 1253def _save_data_async(function: Callable, contents: Any, target_path: Path): 1254 """Save contents to a static file in asynchronized manner.""" 1255 atest_utils.run_multi_proc( 1256 func=function, 1257 args=[contents, target_path], 1258 # We set `daemon` to `False` to make sure that Atest doesn't exit before 1259 # writing the cache file. 1260 daemon=False, 1261 ) 1262 1263 1264def merge_soong_info(name_to_module_info, mod_bp_infos): 1265 """Merge the dependency and srcs in mod_bp_infos to name_to_module_info. 1266 1267 Args: 1268 name_to_module_info: Dict of module name to module info dict. 1269 mod_bp_infos: Dict of module name to bp's module info dict. 1270 1271 Returns: 1272 Dict of updated name_to_module_info. 1273 """ 1274 merge_items = [ 1275 constants.MODULE_DEPENDENCIES, 1276 constants.MODULE_SRCS, 1277 constants.MODULE_LIBS, 1278 constants.MODULE_STATIC_LIBS, 1279 constants.MODULE_STATIC_DEPS, 1280 constants.MODULE_PATH, 1281 ] 1282 for module_name, dep_info in mod_bp_infos.items(): 1283 mod_info = name_to_module_info.setdefault(module_name, {}) 1284 for merge_item in merge_items: 1285 dep_info_values = dep_info.get(merge_item, []) 1286 mod_info_values = mod_info.get(merge_item, []) 1287 mod_info_values.extend(dep_info_values) 1288 mod_info_values.sort() 1289 # deduplicate values just in case. 1290 mod_info_values = list(dict.fromkeys(mod_info_values)) 1291 name_to_module_info[module_name][merge_item] = mod_info_values 1292 return name_to_module_info 1293 1294 1295def _add_missing_variant_modules(name_to_module_info: Dict[str, Module]): 1296 missing_modules = {} 1297 1298 # Android's build system automatically adds a suffix for some build module 1299 # variants. For example, a module-info entry for a module originally named 1300 # 'HelloWorldTest' might appear as 'HelloWorldTest_32' and which Atest would 1301 # not be able to find. We add such entries if not already present so they 1302 # can be looked up using their declared module name. 1303 for mod_name, mod_info in name_to_module_info.items(): 1304 declared_module_name = mod_info.get(constants.MODULE_NAME, mod_name) 1305 if declared_module_name in name_to_module_info: 1306 continue 1307 missing_modules.setdefault(declared_module_name, mod_info) 1308 1309 name_to_module_info.update(missing_modules) 1310 1311 1312def contains_same_mainline_modules( 1313 mainline_modules: Set[str], module_lists: Set[str] 1314): 1315 """Check if mainline modules listed on command line is 1316 1317 the same set as config. 1318 1319 Args: 1320 mainline_modules: A list of mainline modules from triggered test. 1321 module_lists: A list of concatenate mainline module string from test 1322 configs. 1323 1324 Returns 1325 True if the set mainline modules from triggered test is in the test 1326 configs. 1327 """ 1328 for module_string in module_lists: 1329 if mainline_modules == set(module_string.split('+')): 1330 return True 1331 return False 1332 1333 1334def get_path_to_module_info(name_to_module_info): 1335 """Return the path_to_module_info dict. 1336 1337 Args: 1338 name_to_module_info: Dict of module name to module info dict. 1339 1340 Returns: 1341 Dict of module path to module info dict. 1342 """ 1343 path_to_module_info = {} 1344 for mod_name, mod_info in name_to_module_info.items(): 1345 # Cross-compiled and multi-arch modules actually all belong to 1346 # a single target so filter out these extra modules. 1347 if mod_name != mod_info.get(constants.MODULE_NAME, ''): 1348 continue 1349 for path in mod_info.get(constants.MODULE_PATH, []): 1350 mod_info[constants.MODULE_NAME] = mod_name 1351 # There could be multiple modules in a path. 1352 if path in path_to_module_info: 1353 path_to_module_info[path].append(mod_info) 1354 else: 1355 path_to_module_info[path] = [mod_info] 1356 return path_to_module_info 1357 1358 1359def _get_module_names(path_to_module_info, rel_module_path): 1360 """Get the modules that all have module_path. 1361 1362 Args: 1363 path_to_module_info: Dict of path to module info. 1364 rel_module_path: path of module in module-info.json. 1365 1366 Returns: 1367 List of module names. 1368 """ 1369 return [ 1370 m.get(constants.MODULE_NAME) 1371 for m in path_to_module_info.get(rel_module_path, []) 1372 ] 1373 1374 1375def _get_robolectric_test_name( 1376 name_to_module_info: Dict[str, Dict], 1377 path_to_module_info: Dict[str, Dict], 1378 info: Dict[str, Any], 1379) -> str: 1380 """Returns runnable robolectric module name. 1381 1382 This method is for legacy robolectric tests and returns one of associated 1383 modules. The pattern is determined by the amount of shards: 1384 1385 10 shards: 1386 FooTests -> RunFooTests0, RunFooTests1 ... RunFooTests9 1387 No shard: 1388 FooTests -> RunFooTests 1389 1390 Arg: 1391 name_to_module_info: Dict of name to module info. 1392 path_to_module_info: Dict of path to module info. 1393 info: Dict of module info to check. 1394 1395 Returns: 1396 String of the first-matched associated module that belongs to the 1397 actual robolectric module, None if nothing has been found. 1398 """ 1399 if not info: 1400 return '' 1401 module_paths = info.get(constants.MODULE_PATH, []) 1402 if not module_paths: 1403 return '' 1404 filtered_module_names = [ 1405 name 1406 for name in _get_module_names(path_to_module_info, module_paths[0]) 1407 if name.startswith('Run') 1408 ] 1409 return next( 1410 ( 1411 name 1412 for name in filtered_module_names 1413 if ModuleInfo.is_legacy_robolectric_class( 1414 name_to_module_info.get(name) 1415 ) 1416 ), 1417 '', 1418 ) 1419 1420 1421def _is_legacy_robolectric_test( 1422 name_to_module_info: Dict[str, Dict], 1423 path_to_module_info: Dict[str, Dict], 1424 info: Dict[str, Any], 1425) -> bool: 1426 """Return whether the module_name is a legacy Robolectric test""" 1427 if ModuleInfo.is_tradefed_testable_module(info): 1428 return False 1429 return bool( 1430 _get_robolectric_test_name(name_to_module_info, path_to_module_info, info) 1431 ) 1432 1433 1434def get_module_info_target() -> str: 1435 """Get module info target name for soong_ui.bash""" 1436 build_top = atest_utils.get_build_top() 1437 module_info_path = atest_utils.get_product_out(_MODULE_INFO) 1438 if module_info_path.is_relative_to(build_top): 1439 return str(module_info_path.relative_to(build_top)) 1440 1441 logging.debug('Found customized OUT_DIR!') 1442 return str(module_info_path) 1443 1444 1445def build(): 1446 """Build module-info.json""" 1447 logging.debug( 1448 'Generating %s - this is required for initial runs or forced rebuilds.', 1449 _MODULE_INFO, 1450 ) 1451 build_start = time.time() 1452 if not atest_utils.build([get_module_info_target()]): 1453 sys.exit(ExitCode.BUILD_FAILURE) 1454 1455 metrics.LocalDetectEvent( 1456 detect_type=DetectType.ONLY_BUILD_MODULE_INFO, 1457 result=int(time.time() - build_start), 1458 ) 1459 1460 1461def _is_testable_module( 1462 name_to_module_info: Dict[str, Dict], 1463 path_to_module_info: Dict[str, Dict], 1464 info: Dict[str, Any], 1465) -> bool: 1466 """Check if module is something we can test. 1467 1468 A module is testable if: 1469 - it's a tradefed testable module, or 1470 - it's a Mobly module, or 1471 - it's a robolectric module (or shares path with one). 1472 1473 Args: 1474 name_to_module_info: Dict of name to module info. 1475 path_to_module_info: Dict of path to module info. 1476 info: Dict of module info to check. 1477 1478 Returns: 1479 True if we can test this module, False otherwise. 1480 """ 1481 if not info or not info.get(constants.MODULE_NAME): 1482 return False 1483 if ModuleInfo.is_tradefed_testable_module(info): 1484 return True 1485 if ModuleInfo.is_mobly_module(info): 1486 return True 1487 if _is_legacy_robolectric_test( 1488 name_to_module_info, path_to_module_info, info 1489 ): 1490 return True 1491 return False 1492 1493 1494def _get_testable_modules( 1495 name_to_module_info: Dict[str, Dict], 1496 path_to_module_info: Dict[str, Dict], 1497 suite: str = None, 1498 index_path: Path = None, 1499): 1500 """Return testable modules of the given suite name.""" 1501 suite_to_modules = _get_suite_to_modules( 1502 name_to_module_info, path_to_module_info, index_path 1503 ) 1504 1505 return _filter_modules_by_suite(suite_to_modules, suite) 1506 1507 1508def _get_suite_to_modules( 1509 name_to_module_info: Dict[str, Dict], 1510 path_to_module_info: Dict[str, Dict], 1511 index_path: Path = None, 1512) -> Dict[str, Set[str]]: 1513 """Map suite and its modules. 1514 1515 Args: 1516 name_to_module_info: Dict of name to module info. 1517 path_to_module_info: Dict of path to module info. 1518 index_path: Path of the stored content. 1519 1520 Returns: 1521 Dict of suite and testable modules mapping. 1522 """ 1523 suite_to_modules = {} 1524 1525 for _, info in name_to_module_info.items(): 1526 if _is_testable_module(name_to_module_info, path_to_module_info, info): 1527 testable_module = info.get(constants.MODULE_NAME) 1528 suites = ( 1529 info.get('compatibility_suites') 1530 if info.get('compatibility_suites') 1531 else ['null-suite'] 1532 ) 1533 1534 for suite in suites: 1535 suite_to_modules.setdefault(suite, set()).add(testable_module) 1536 1537 if index_path: 1538 _index_testable_modules(suite_to_modules, index_path) 1539 1540 return suite_to_modules 1541 1542 1543def _filter_modules_by_suite( 1544 suite_to_modules: Dict[str, Set[str]], 1545 suite: str = None, 1546) -> Set[str]: 1547 """Return modules of the given suite name.""" 1548 if suite: 1549 return suite_to_modules.get(suite) 1550 1551 return {mod for mod_set in suite_to_modules.values() for mod in mod_set} 1552 1553 1554def _index_testable_modules(contents: Any, index_path: Path): 1555 """Dump testable modules. 1556 1557 Args: 1558 content: An object that will be written to the index file. 1559 index_path: Path to the saved index file. 1560 """ 1561 logging.debug( 1562 r'Indexing testable modules... ' 1563 r'(This is required whenever module-info.json ' 1564 r'was rebuilt.)' 1565 ) 1566 index_path.parent.mkdir(parents=True, exist_ok=True) 1567 with tempfile.NamedTemporaryFile(delete=False) as cache: 1568 try: 1569 pickle.dump(contents, cache, protocol=2) 1570 shutil.move(cache.name, index_path) 1571 logging.debug('%s is created successfully.', index_path) 1572 except IOError: 1573 atest_utils.print_and_log_error('Failed in dumping %s', cache) 1574 os.remove(cache.name) 1575 1576 1577class SqliteDict(collections.abc.Mapping): 1578 """A class that loads a Sqlite DB as a dictionary-like object. 1579 1580 Args: 1581 conn: A connection to the Sqlite database. 1582 table_name: A string the table name. 1583 """ 1584 1585 def __init__(self, conn: sqlite3.Connection, table_name: str): 1586 """Initialize the SqliteDict instance.""" 1587 self.conn = conn 1588 self.table = table_name 1589 1590 def __iter__(self) -> str: 1591 """Iterate over the keys in the SqliteDict.""" 1592 for key in self._load_key_rows(): 1593 yield key[0] 1594 1595 def _load_key_rows(self) -> Set[str]: 1596 """Load the key rows from the database table.""" 1597 results = self.conn.execute(f'SELECT key FROM {self.table}').fetchall() 1598 return set(results) 1599 1600 def __len__(self) -> int: 1601 """Get the size of key-value pairs in the SqliteDict.""" 1602 return len(self._load_key_rows()) 1603 1604 def __getitem__(self, key) -> Dict[str, Any]: 1605 """Get the value associated with the specified key.""" 1606 result = self.conn.execute( 1607 f'SELECT value FROM {self.table} WHERE key = ?', (key,) 1608 ).fetchone() 1609 if result: 1610 return json.loads(result[0]) 1611 raise KeyError(f'Bad key: {key}') 1612 1613 def items(self) -> Tuple[str, Dict[str, Any]]: 1614 """Iterate over the key-value pairs in the SqliteDict.""" 1615 for key in self: 1616 value = self[key] 1617 yield key, value 1618