1# Lint as: python3 2# 3# Copyright 2020, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Utilities for C-Suite integration tests.""" 17 18import argparse 19import contextlib 20import logging 21import os 22import pathlib 23import shlex 24import shutil 25import stat 26import subprocess 27import sys 28import tempfile 29import zipfile 30from importlib import resources 31from typing import Sequence 32 33import csuite_test 34 35# Export symbols to reduce the number of imports tests have to list. 36TestCase = csuite_test.TestCase # pylint: disable=invalid-name 37get_device_serial = csuite_test.get_device_serial 38 39# Keep any created temporary directories for debugging test failures. The 40# directories do not need explicit removal since they are created using the 41# system's temporary-file facility. 42_KEEP_TEMP_DIRS = False 43 44 45class CSuiteHarness(contextlib.AbstractContextManager): 46 """Interface class for interacting with the C-Suite harness. 47 48 WARNING: Explicitly clean up created instances or use as a context manager. 49 Not doing so will result in a ResourceWarning for the implicit cleanup which 50 confuses the TradeFed Python test output parser. 51 """ 52 53 def __init__(self): 54 self._suite_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite')) 55 logging.debug('Created harness directory: %s', self._suite_dir) 56 57 with resources.files('testdata').joinpath( 58 'csuite-standalone.zip').open('rb') as data: 59 with zipfile.ZipFile(data, 'r') as f: 60 f.extractall(self._suite_dir) 61 62 # Add owner-execute permission on scripts since zip does not preserve them. 63 self._launcher_binary = self._suite_dir.joinpath( 64 'android-csuite/tools/csuite-tradefed') 65 _add_owner_exec_permission(self._launcher_binary) 66 67 self._testcases_dir = self._suite_dir.joinpath('android-csuite/testcases') 68 69 def __exit__(self, unused_type, unused_value, unused_traceback): 70 self.cleanup() 71 72 def cleanup(self): 73 if _KEEP_TEMP_DIRS: 74 return 75 shutil.rmtree(self._suite_dir, ignore_errors=True) 76 77 78 def run_and_wait(self, flags: Sequence[str]) -> subprocess.CompletedProcess: 79 """Starts the Tradefed launcher and waits for it to complete.""" 80 81 env = os.environ.copy() 82 83 # Unset environment variables that would cause the script to think it's in a 84 # build tree. 85 env.pop('ANDROID_BUILD_TOP', None) 86 env.pop('ANDROID_HOST_OUT', None) 87 88 # Unset environment variables that would cause TradeFed to find test configs 89 # other than the ones created by the test. 90 env.pop('ANDROID_HOST_OUT_TESTCASES', None) 91 env.pop('ANDROID_TARGET_OUT_TESTCASES', None) 92 93 # Unset environment variables that might cause the suite to pick up a 94 # connected device that wasn't explicitly specified. 95 env.pop('ANDROID_SERIAL', None) 96 97 # Unset environment variables that might cause the TradeFed to load classes 98 # that weren't included in the standalone suite zip. 99 env.pop('TF_GLOBAL_CONFIG', None) 100 # Ensure the process operates in standalone mode 101 env['LOCAL_MODE'] = "1" 102 # Set the environment variable that TradeFed requires to find test modules. 103 env['ANDROID_TARGET_OUT_TESTCASES'] = self._testcases_dir 104 jdk17_path = '/jdk/jdk17/linux-x86' 105 if os.path.isdir(jdk17_path): 106 env['JAVA_HOME'] = jdk17_path 107 java_path = jdk17_path + '/bin' 108 env['PATH'] = java_path + ':' + env['PATH'] 109 110 return _run_command([self._launcher_binary] + flags, env=env) 111 112 113class PackageRepository(contextlib.AbstractContextManager): 114 """A file-system based APK repository for use in tests. 115 116 WARNING: Explicitly clean up created instances or use as a context manager. 117 Not doing so will result in a ResourceWarning for the implicit cleanup which 118 confuses the TradeFed Python test output parser. 119 """ 120 121 def __init__(self): 122 self._root_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite_apk_dir')) 123 logging.info('Created repository directory: %s', self._root_dir) 124 125 def __exit__(self, unused_type, unused_value, unused_traceback): 126 self.cleanup() 127 128 def cleanup(self): 129 if _KEEP_TEMP_DIRS: 130 return 131 shutil.rmtree(self._root_dir, ignore_errors=True) 132 133 def get_path(self) -> pathlib.Path: 134 """Returns the path to the repository's root directory.""" 135 return self._root_dir 136 137 def add_package_apks(self, package_name: str, apks: Sequence[str]): 138 """Adds the provided package APKs to the repository.""" 139 apk_dir = self._root_dir.joinpath(package_name) 140 141 # Raises if the directory already exists. 142 apk_dir.mkdir() 143 for apk in apks: 144 apk = apk + '.apk' 145 with ( 146 resources.files('testdata').joinpath(apk).open('rb') as data, 147 open(os.path.join(apk_dir, apk), 'wb') as file, 148 ): 149 shutil.copyfileobj(data, file) 150 151 152class Adb: 153 """Encapsulates adb functionality to simplify usage in tests. 154 155 Most methods in this class raise an exception if they fail to execute. This 156 behavior can be overridden by using the check parameter. 157 """ 158 159 def __init__(self, 160 adb_binary_path: pathlib.Path = None, 161 device_serial: str = None): 162 self._args = [adb_binary_path or 'adb'] 163 164 device_serial = device_serial or get_device_serial() 165 if device_serial: 166 self._args.extend(['-s', device_serial]) 167 168 def shell(self, 169 args: Sequence[str], 170 check: bool = None) -> subprocess.CompletedProcess: 171 """Runs an adb shell command and waits for it to complete. 172 173 Note that the exit code of the returned object corresponds to that of 174 the adb command and not the command executed in the shell. 175 176 Args: 177 args: a sequence of program arguments to pass to the shell. 178 check: whether to raise if the process terminates with a non-zero exit 179 code. 180 181 Returns: 182 An object representing a process that has finished and that can be 183 queried. 184 """ 185 return self.run(['shell'] + args, check) 186 187 def run(self, 188 args: Sequence[str], 189 check: bool = None) -> subprocess.CompletedProcess: 190 """Runs an adb command and waits for it to complete.""" 191 return _run_command(self._args + args, check=check) 192 193 def uninstall(self, package_name: str, check: bool = None): 194 """Uninstalls the specified package.""" 195 self.run(['uninstall', package_name], check=check) 196 197 def list_packages(self) -> Sequence[str]: 198 """Lists packages installed on the device.""" 199 p = self.shell(['pm', 'list', 'packages']) 200 return [l.split(':')[1] for l in p.stdout.splitlines()] 201 202 203def _run_command(args, check=False, **kwargs) -> subprocess.CompletedProcess: 204 """A wrapper for subprocess.run that overrides defaults and adds logging.""" 205 env = kwargs.get('env', {}) 206 207 # Log the command-line for debugging failed tests. Note that we convert 208 # tokens to strings for _shlex_join. 209 env_str = ['env', '-i'] + [f'{k}={v}' for k, v in env.items()] 210 args_str = [str(t) for t in args] 211 212 # Override some defaults. Note that 'check' deviates from this pattern to 213 # avoid getting warnings about using subprocess.run without an explicitly set 214 # `check` parameter. 215 kwargs.setdefault('capture_output', True) 216 kwargs.setdefault('universal_newlines', True) 217 218 logging.debug('Running command: %s', _shlex_join(env_str + args_str)) 219 220 return subprocess.run(args, check=check, **kwargs) 221 222 223def _add_owner_exec_permission(path: pathlib.Path): 224 path.chmod(path.stat().st_mode | stat.S_IEXEC) 225 226 227def _shlex_join(split_command: Sequence[str]) -> str: 228 """Concatenate tokens and return a shell-escaped string.""" 229 # This is an alternative to shlex.join that doesn't exist in Python versions 230 # < 3.8. 231 return ' '.join(shlex.quote(t) for t in split_command) 232 233 234def main(): 235 global _KEEP_TEMP_DIRS 236 237 parser = argparse.ArgumentParser(parents=[csuite_test.create_arg_parser()]) 238 parser.add_argument( 239 '--log-level', 240 choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], 241 default='WARNING', 242 help='sets the logging level threshold') 243 parser.add_argument( 244 '--keep-temp-dirs', 245 type=bool, 246 help='keeps any created temporary directories for debugging failures') 247 args, unittest_argv = parser.parse_known_args(sys.argv) 248 249 _KEEP_TEMP_DIRS = args.keep_temp_dirs 250 logging.basicConfig(level=getattr(logging, args.log_level)) 251 252 csuite_test.run_tests(args, unittest_argv) 253