1#!/usr/bin/env python3 2# 3# Copyright 2022, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Integration tests for the Atest Bazel mode feature.""" 18 19# pylint: disable=invalid-name 20# pylint: disable=missing-class-docstring 21# pylint: disable=missing-function-docstring 22 23import json 24import os 25from pathlib import Path 26import re 27import shutil 28import subprocess 29import tempfile 30from typing import Any, Dict 31import unittest 32 33 34class ResultCompareTest(unittest.TestCase): 35 36 def setUp(self): 37 self.src_root_path = Path(os.environ['ANDROID_BUILD_TOP']) 38 self.out_dir_path = Path(tempfile.mkdtemp()) 39 self.test_env = self.setup_test_env() 40 41 def tearDown(self): 42 shutil.rmtree(self.out_dir_path) 43 44 def test_standard_mode_and_bazel_mode_result_equal(self): 45 standard_mode_result = self.get_test_result( 46 shell_cmd='atest -c -m --host --host-unit-test-only' 47 ) 48 49 bazel_mode_result = self.get_test_result( 50 shell_cmd=( 51 'atest -c --bazel-mode --host --host-unit-test-only ' 52 '--bazel-arg=--test_timeout=300' 53 ), 54 is_bazel_mode=True, 55 ) 56 57 self.assert_test_result_equal(standard_mode_result, bazel_mode_result) 58 59 def setup_test_env(self) -> Dict[str, Any]: 60 test_env = { 61 'PATH': os.environ['PATH'], 62 'HOME': os.environ['HOME'], 63 'OUT_DIR': str(self.out_dir_path), 64 } 65 return test_env 66 67 def get_test_result( 68 self, 69 shell_cmd: str, 70 is_bazel_mode: bool = False, 71 ) -> Dict[str, str]: 72 result_file_name = 'test_result' 73 if is_bazel_mode: 74 shell_cmd = ( 75 f'{shell_cmd} --bazel-arg=--build_event_json_file={result_file_name}' 76 ) 77 78 completed_process = self.run_shell_command(shell_cmd) 79 result_file_path = self.get_result_file_path( 80 completed_process, result_file_name, is_bazel_mode 81 ) 82 83 if is_bazel_mode: 84 return parse_bazel_result(result_file_path) 85 return parse_standard_result(result_file_path) 86 87 def get_result_file_path( 88 self, 89 completed_process: subprocess.CompletedProcess, 90 result_file_name: str, 91 is_bazel_mode: bool = False, 92 ) -> Path: 93 if is_bazel_mode: 94 return self.out_dir_path.joinpath( 95 'atest_bazel_workspace', result_file_name 96 ) 97 98 result_file_path = None 99 for line in completed_process.stdout.decode().splitlines(): 100 if line.startswith('Test Logs have been saved in '): 101 result_file_path = Path( 102 re.sub('Test Logs have been saved in ', '', line).replace( 103 'log', result_file_name 104 ) 105 ) 106 break 107 108 if not result_file_path: 109 raise Exception('Could not find test result filepath') 110 111 return result_file_path 112 113 def run_shell_command( 114 self, 115 shell_command: str, 116 ) -> subprocess.CompletedProcess: 117 return subprocess.run( 118 '. build/envsetup.sh && ' 119 'lunch aosp_cf_x86_64_pc-userdebug && ' 120 f'{shell_command}', 121 env=self.test_env, 122 cwd=self.src_root_path, 123 shell=True, 124 check=False, 125 stderr=subprocess.STDOUT, 126 stdout=subprocess.PIPE, 127 ) 128 129 def assert_test_result_equal(self, result1, result2): 130 self.assertEqual(set(result1.keys()), set(result2.keys())) 131 132 print( 133 '{0:100} {1:20} {2}'.format( 134 'Test', 'Atest Standard Mode', 'Atest Bazel Mode' 135 ) 136 ) 137 count = 0 138 for k, v in result1.items(): 139 if v != result2[k]: 140 count += 1 141 print('{0:100} {1:20} {2}'.format(k, v, result2[k])) 142 print( 143 f'Total Number of Host Unit Test: {len(result1)}. {count} tests ' 144 'have different results.' 145 ) 146 147 self.assertEqual(count, 0) 148 149 150def parse_standard_result(result_file: Path) -> Dict[str, str]: 151 result = {} 152 with result_file.open('r') as f: 153 json_result = json.loads(f.read()) 154 for k, v in json_result['test_runner']['AtestTradefedTestRunner'].items(): 155 name = k.split()[-1] 156 if name in result: 157 raise Exception(f'Duplicated Test Target: `{name}`') 158 159 # Test passed when there are no failed test cases and no errors. 160 result[name] = ( 161 'PASSED' 162 if v['summary']['FAILED'] == 0 and not v.get('ERROR') 163 else 'FAILED' 164 ) 165 return result 166 167 168def parse_bazel_result(result_file: Path) -> Dict[str, str]: 169 result = {} 170 with result_file.open('r') as f: 171 content = f.read() 172 events = content.splitlines() 173 174 for e in events: 175 json_event = json.loads(e) 176 if 'testSummary' in json_event['id']: 177 name = ( 178 json_event['id']['testSummary']['label'] 179 .split(':')[-1] 180 .removesuffix('_host') 181 ) 182 result[name] = json_event['testSummary']['overallStatus'] 183 return result 184 185 186if __name__ == '__main__': 187 unittest.main(verbosity=2) 188