1# Lint as: python3
2#
3# Copyright 2020, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Utilities for C-Suite integration tests."""
17
18import argparse
19import contextlib
20import logging
21import os
22import pathlib
23import shlex
24import shutil
25import stat
26import subprocess
27import sys
28import tempfile
29import zipfile
30from importlib import resources
31from typing import Sequence
32
33import csuite_test
34
35# Export symbols to reduce the number of imports tests have to list.
36TestCase = csuite_test.TestCase  # pylint: disable=invalid-name
37get_device_serial = csuite_test.get_device_serial
38
39# Keep any created temporary directories for debugging test failures. The
40# directories do not need explicit removal since they are created using the
41# system's temporary-file facility.
42_KEEP_TEMP_DIRS = False
43
44
45class CSuiteHarness(contextlib.AbstractContextManager):
46  """Interface class for interacting with the C-Suite harness.
47
48  WARNING: Explicitly clean up created instances or use as a context manager.
49  Not doing so will result in a ResourceWarning for the implicit cleanup which
50  confuses the TradeFed Python test output parser.
51  """
52
53  def __init__(self):
54    self._suite_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite'))
55    logging.debug('Created harness directory: %s', self._suite_dir)
56
57    with resources.files('testdata').joinpath(
58        'csuite-standalone.zip').open('rb') as data:
59      with zipfile.ZipFile(data, 'r') as f:
60        f.extractall(self._suite_dir)
61
62    # Add owner-execute permission on scripts since zip does not preserve them.
63    self._launcher_binary = self._suite_dir.joinpath(
64        'android-csuite/tools/csuite-tradefed')
65    _add_owner_exec_permission(self._launcher_binary)
66
67    self._testcases_dir = self._suite_dir.joinpath('android-csuite/testcases')
68
69  def __exit__(self, unused_type, unused_value, unused_traceback):
70    self.cleanup()
71
72  def cleanup(self):
73    if _KEEP_TEMP_DIRS:
74      return
75    shutil.rmtree(self._suite_dir, ignore_errors=True)
76
77
78  def run_and_wait(self, flags: Sequence[str]) -> subprocess.CompletedProcess:
79    """Starts the Tradefed launcher and waits for it to complete."""
80
81    env = os.environ.copy()
82
83    # Unset environment variables that would cause the script to think it's in a
84    # build tree.
85    env.pop('ANDROID_BUILD_TOP', None)
86    env.pop('ANDROID_HOST_OUT', None)
87
88    # Unset environment variables that would cause TradeFed to find test configs
89    # other than the ones created by the test.
90    env.pop('ANDROID_HOST_OUT_TESTCASES', None)
91    env.pop('ANDROID_TARGET_OUT_TESTCASES', None)
92
93    # Unset environment variables that might cause the suite to pick up a
94    # connected device that wasn't explicitly specified.
95    env.pop('ANDROID_SERIAL', None)
96
97    # Unset environment variables that might cause the TradeFed to load classes
98    # that weren't included in the standalone suite zip.
99    env.pop('TF_GLOBAL_CONFIG', None)
100    # Ensure the process operates in standalone mode
101    env['LOCAL_MODE'] = "1"
102    # Set the environment variable that TradeFed requires to find test modules.
103    env['ANDROID_TARGET_OUT_TESTCASES'] = self._testcases_dir
104    jdk17_path = '/jdk/jdk17/linux-x86'
105    if os.path.isdir(jdk17_path):
106      env['JAVA_HOME'] = jdk17_path
107      java_path = jdk17_path + '/bin'
108      env['PATH'] = java_path + ':' + env['PATH']
109
110    return _run_command([self._launcher_binary] + flags, env=env)
111
112
113class PackageRepository(contextlib.AbstractContextManager):
114  """A file-system based APK repository for use in tests.
115
116  WARNING: Explicitly clean up created instances or use as a context manager.
117  Not doing so will result in a ResourceWarning for the implicit cleanup which
118  confuses the TradeFed Python test output parser.
119  """
120
121  def __init__(self):
122    self._root_dir = pathlib.Path(tempfile.mkdtemp(prefix='csuite_apk_dir'))
123    logging.info('Created repository directory: %s', self._root_dir)
124
125  def __exit__(self, unused_type, unused_value, unused_traceback):
126    self.cleanup()
127
128  def cleanup(self):
129    if _KEEP_TEMP_DIRS:
130      return
131    shutil.rmtree(self._root_dir, ignore_errors=True)
132
133  def get_path(self) -> pathlib.Path:
134    """Returns the path to the repository's root directory."""
135    return self._root_dir
136
137  def add_package_apks(self, package_name: str, apks: Sequence[str]):
138    """Adds the provided package APKs to the repository."""
139    apk_dir = self._root_dir.joinpath(package_name)
140
141    # Raises if the directory already exists.
142    apk_dir.mkdir()
143    for apk in apks:
144      apk = apk + '.apk'
145      with (
146        resources.files('testdata').joinpath(apk).open('rb') as data,
147        open(os.path.join(apk_dir, apk), 'wb') as file,
148      ):
149        shutil.copyfileobj(data, file)
150
151
152class Adb:
153  """Encapsulates adb functionality to simplify usage in tests.
154
155  Most methods in this class raise an exception if they fail to execute. This
156  behavior can be overridden by using the check parameter.
157  """
158
159  def __init__(self,
160               adb_binary_path: pathlib.Path = None,
161               device_serial: str = None):
162    self._args = [adb_binary_path or 'adb']
163
164    device_serial = device_serial or get_device_serial()
165    if device_serial:
166      self._args.extend(['-s', device_serial])
167
168  def shell(self,
169            args: Sequence[str],
170            check: bool = None) -> subprocess.CompletedProcess:
171    """Runs an adb shell command and waits for it to complete.
172
173    Note that the exit code of the returned object corresponds to that of
174    the adb command and not the command executed in the shell.
175
176    Args:
177      args: a sequence of program arguments to pass to the shell.
178      check: whether to raise if the process terminates with a non-zero exit
179        code.
180
181    Returns:
182      An object representing a process that has finished and that can be
183      queried.
184    """
185    return self.run(['shell'] + args, check)
186
187  def run(self,
188          args: Sequence[str],
189          check: bool = None) -> subprocess.CompletedProcess:
190    """Runs an adb command and waits for it to complete."""
191    return _run_command(self._args + args, check=check)
192
193  def uninstall(self, package_name: str, check: bool = None):
194    """Uninstalls the specified package."""
195    self.run(['uninstall', package_name], check=check)
196
197  def list_packages(self) -> Sequence[str]:
198    """Lists packages installed on the device."""
199    p = self.shell(['pm', 'list', 'packages'])
200    return [l.split(':')[1] for l in p.stdout.splitlines()]
201
202
203def _run_command(args, check=False, **kwargs) -> subprocess.CompletedProcess:
204  """A wrapper for subprocess.run that overrides defaults and adds logging."""
205  env = kwargs.get('env', {})
206
207  # Log the command-line for debugging failed tests. Note that we convert
208  # tokens to strings for _shlex_join.
209  env_str = ['env', '-i'] + [f'{k}={v}' for k, v in env.items()]
210  args_str = [str(t) for t in args]
211
212  # Override some defaults. Note that 'check' deviates from this pattern to
213  # avoid getting warnings about using subprocess.run without an explicitly set
214  # `check` parameter.
215  kwargs.setdefault('capture_output', True)
216  kwargs.setdefault('universal_newlines', True)
217
218  logging.debug('Running command: %s', _shlex_join(env_str + args_str))
219
220  return subprocess.run(args, check=check, **kwargs)
221
222
223def _add_owner_exec_permission(path: pathlib.Path):
224  path.chmod(path.stat().st_mode | stat.S_IEXEC)
225
226
227def _shlex_join(split_command: Sequence[str]) -> str:
228  """Concatenate tokens and return a shell-escaped string."""
229  # This is an alternative to shlex.join that doesn't exist in Python versions
230  # < 3.8.
231  return ' '.join(shlex.quote(t) for t in split_command)
232
233
234def main():
235  global _KEEP_TEMP_DIRS
236
237  parser = argparse.ArgumentParser(parents=[csuite_test.create_arg_parser()])
238  parser.add_argument(
239      '--log-level',
240      choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
241      default='WARNING',
242      help='sets the logging level threshold')
243  parser.add_argument(
244      '--keep-temp-dirs',
245      type=bool,
246      help='keeps any created temporary directories for debugging failures')
247  args, unittest_argv = parser.parse_known_args(sys.argv)
248
249  _KEEP_TEMP_DIRS = args.keep_temp_dirs
250  logging.basicConfig(level=getattr(logging, args.log_level))
251
252  csuite_test.run_tests(args, unittest_argv)
253