1#!/usr/bin/env python3 2# 3# Copyright (C) 2021 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""test_utils.py: utils for testing. 18""" 19 20import logging 21from multiprocessing.connection import Connection 22import os 23from pathlib import Path 24import re 25import shutil 26import sys 27import subprocess 28import time 29from typing import List, Optional, Tuple, Union 30import unittest 31 32from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str 33 34INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh')) 35 36 37class TestHelper: 38 """ Keep global test options. """ 39 40 @classmethod 41 def init( 42 cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str], 43 device_serial_number: Optional[str], 44 progress_conn: Optional[Connection]): 45 """ 46 When device_serial_number is None, no Android device is used. 47 When device_serial_number is '', use the default Android device. 48 When device_serial_number is not empty, select Android device by serial number. 49 """ 50 cls.script_dir = Path(__file__).resolve().parents[1] 51 cls.test_base_dir = Path(test_dir).resolve() 52 cls.test_base_dir.mkdir(parents=True, exist_ok=True) 53 cls.testdata_dir = Path(testdata_dir).resolve() 54 cls.browser_option = [] if use_browser else ['--no_browser'] 55 cls.ndk_path = ndk_path 56 cls.progress_conn = progress_conn 57 58 # Logs can come from multiple processes. So use append mode to avoid overwrite. 59 cls.log_fh = open(cls.test_base_dir / 'test.log', 'a') 60 logging.getLogger().handlers.clear() 61 logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh)) 62 os.close(sys.stderr.fileno()) 63 os.dup2(cls.log_fh.fileno(), sys.stderr.fileno()) 64 65 if device_serial_number is not None: 66 if device_serial_number: 67 os.environ['ANDROID_SERIAL'] = device_serial_number 68 cls.adb = AdbHelper(enable_switch_to_root=True) 69 cls.android_version = cls.adb.get_android_version() 70 cls.device_features = None 71 72 @classmethod 73 def log(cls, s: str): 74 cls.log_fh.write(s + '\n') 75 # Child processes can also write to log file, so flush it immediately to keep the order. 76 cls.log_fh.flush() 77 78 @classmethod 79 def testdata_path(cls, testdata_name: str) -> str: 80 """ Return the path of a test data. """ 81 return str(cls.testdata_dir / testdata_name) 82 83 @classmethod 84 def get_test_dir(cls, test_name: str) -> Path: 85 """ Return the dir to run a test. """ 86 return cls.test_base_dir / test_name 87 88 @classmethod 89 def script_path(cls, script_name: str) -> str: 90 """ Return the dir of python scripts. """ 91 return str(cls.script_dir / script_name) 92 93 @classmethod 94 def get_device_features(cls): 95 if cls.device_features is None: 96 args = [sys.executable, cls.script_path( 97 'run_simpleperf_on_device.py'), 'list', '--show-features'] 98 output = subprocess.check_output(args, stderr=TestHelper.log_fh) 99 output = bytes_to_str(output) 100 cls.device_features = output.split() 101 return cls.device_features 102 103 @classmethod 104 def is_trace_offcpu_supported(cls): 105 return 'trace-offcpu' in cls.get_device_features() 106 107 @classmethod 108 def get_32bit_abi(cls): 109 return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0] 110 111 @classmethod 112 def get_kernel_version(cls) -> Tuple[int]: 113 output = cls.adb.check_run_and_return_output(['shell', 'uname', '-r']) 114 m = re.search(r'^(\d+)\.(\d+)', output) 115 assert m 116 return (int(m.group(1)), int(m.group(2))) 117 118 @classmethod 119 def write_progress(cls, progress: str): 120 if cls.progress_conn: 121 cls.progress_conn.send(progress) 122 123 124class TestBase(unittest.TestCase): 125 def setUp(self): 126 """ Run each test in a separate dir. """ 127 self.test_dir = TestHelper.get_test_dir( 128 '%s.%s' % (self.__class__.__name__, self._testMethodName)) 129 self.test_dir.mkdir() 130 os.chdir(self.test_dir) 131 TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName)) 132 133 def run(self, result=None): 134 start_time = time.time() 135 ret = super(TestBase, self).run(result) 136 if result.errors and result.errors[-1][0] == self: 137 status = 'FAILED' 138 err_info = result.errors[-1][1] 139 elif result.failures and result.failures[-1][0] == self: 140 status = 'FAILED' 141 err_info = result.failures[-1][1] 142 elif result.skipped and result.skipped[-1][0] == self: 143 status = 'SKIPPED' 144 else: 145 status = 'OK' 146 147 time_taken = time.time() - start_time 148 TestHelper.log( 149 'end test %s.%s %s (%.3fs)' % 150 (self.__class__.__name__, self._testMethodName, status, time_taken)) 151 if status == 'FAILED': 152 TestHelper.log(err_info) 153 154 # Remove test data for passed tests to save space. 155 if status == 'OK': 156 remove(self.test_dir) 157 TestHelper.write_progress( 158 '%s.%s %s %.3fs' % 159 (self.__class__.__name__, self._testMethodName, status, time_taken)) 160 return ret 161 162 def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str: 163 if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT: 164 args += TestHelper.browser_option 165 if TestHelper.ndk_path: 166 if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py', 167 'report_html.py', 'annotate.py']: 168 args += ['--ndk_path', TestHelper.ndk_path] 169 if args[0].endswith('.py'): 170 args = [sys.executable, TestHelper.script_path(args[0])] + args[1:] 171 use_shell = args[0].endswith('.bat') 172 try: 173 if return_output: 174 stdout_fd = subprocess.PIPE 175 drop_output = False 176 elif drop_output: 177 stdout_fd = subprocess.DEVNULL 178 else: 179 stdout_fd = None 180 181 subproc = subprocess.Popen(args, stdout=stdout_fd, 182 stderr=TestHelper.log_fh, shell=use_shell) 183 stdout_data, _ = subproc.communicate() 184 output_data = bytes_to_str(stdout_data) 185 returncode = subproc.returncode 186 187 except OSError: 188 returncode = None 189 self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args) 190 if return_output: 191 return output_data 192 return '' 193 194 def check_strings_in_file(self, filename, strings: List[Union[str, re.Pattern]]): 195 self.check_exist(filename=filename) 196 with open(filename, 'r') as fh: 197 self.check_strings_in_content(fh.read(), strings) 198 199 def check_exist(self, filename=None, dirname=None): 200 if filename: 201 self.assertTrue(os.path.isfile(filename), filename) 202 if dirname: 203 self.assertTrue(os.path.isdir(dirname), dirname) 204 205 def check_strings_in_content(self, content: str, strings: List[Union[str, re.Pattern]]): 206 fulfilled = [] 207 for s in strings: 208 if isinstance(s, re.Pattern): 209 fulfilled.append(s.search(content)) 210 else: 211 fulfilled.append(s in content) 212 self.check_fulfilled_entries(fulfilled, strings) 213 214 def check_fulfilled_entries(self, fulfilled, entries): 215 failed_entries = [] 216 for ok, entry in zip(fulfilled, entries): 217 if not ok: 218 failed_entries.append(entry) 219 220 if failed_entries: 221 self.fail('failed in below entries: %s' % (failed_entries,)) 222