1#!/usr/bin/env python3
2#
3# Copyright (C) 2021 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""test_utils.py: utils for testing.
18"""
19
20import logging
21from multiprocessing.connection import Connection
22import os
23from pathlib import Path
24import re
25import shutil
26import sys
27import subprocess
28import time
29from typing import List, Optional, Tuple, Union
30import unittest
31
32from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str
33
34INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh'))
35
36
37class TestHelper:
38    """ Keep global test options. """
39
40    @classmethod
41    def init(
42            cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str],
43            device_serial_number: Optional[str],
44            progress_conn: Optional[Connection]):
45        """
46            When device_serial_number is None, no Android device is used.
47            When device_serial_number is '', use the default Android device.
48            When device_serial_number is not empty, select Android device by serial number.
49        """
50        cls.script_dir = Path(__file__).resolve().parents[1]
51        cls.test_base_dir = Path(test_dir).resolve()
52        cls.test_base_dir.mkdir(parents=True, exist_ok=True)
53        cls.testdata_dir = Path(testdata_dir).resolve()
54        cls.browser_option = [] if use_browser else ['--no_browser']
55        cls.ndk_path = ndk_path
56        cls.progress_conn = progress_conn
57
58        # Logs can come from multiple processes. So use append mode to avoid overwrite.
59        cls.log_fh = open(cls.test_base_dir / 'test.log', 'a')
60        logging.getLogger().handlers.clear()
61        logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh))
62        os.close(sys.stderr.fileno())
63        os.dup2(cls.log_fh.fileno(), sys.stderr.fileno())
64
65        if device_serial_number is not None:
66            if device_serial_number:
67                os.environ['ANDROID_SERIAL'] = device_serial_number
68            cls.adb = AdbHelper(enable_switch_to_root=True)
69            cls.android_version = cls.adb.get_android_version()
70            cls.device_features = None
71
72    @classmethod
73    def log(cls, s: str):
74        cls.log_fh.write(s + '\n')
75        # Child processes can also write to log file, so flush it immediately to keep the order.
76        cls.log_fh.flush()
77
78    @classmethod
79    def testdata_path(cls, testdata_name: str) -> str:
80        """ Return the path of a test data. """
81        return str(cls.testdata_dir / testdata_name)
82
83    @classmethod
84    def get_test_dir(cls, test_name: str) -> Path:
85        """ Return the dir to run a test. """
86        return cls.test_base_dir / test_name
87
88    @classmethod
89    def script_path(cls, script_name: str) -> str:
90        """ Return the dir of python scripts. """
91        return str(cls.script_dir / script_name)
92
93    @classmethod
94    def get_device_features(cls):
95        if cls.device_features is None:
96            args = [sys.executable, cls.script_path(
97                'run_simpleperf_on_device.py'), 'list', '--show-features']
98            output = subprocess.check_output(args, stderr=TestHelper.log_fh)
99            output = bytes_to_str(output)
100            cls.device_features = output.split()
101        return cls.device_features
102
103    @classmethod
104    def is_trace_offcpu_supported(cls):
105        return 'trace-offcpu' in cls.get_device_features()
106
107    @classmethod
108    def get_32bit_abi(cls):
109        return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0]
110
111    @classmethod
112    def get_kernel_version(cls) -> Tuple[int]:
113        output = cls.adb.check_run_and_return_output(['shell', 'uname', '-r'])
114        m = re.search(r'^(\d+)\.(\d+)', output)
115        assert m
116        return (int(m.group(1)), int(m.group(2)))
117
118    @classmethod
119    def write_progress(cls, progress: str):
120        if cls.progress_conn:
121            cls.progress_conn.send(progress)
122
123
124class TestBase(unittest.TestCase):
125    def setUp(self):
126        """ Run each test in a separate dir. """
127        self.test_dir = TestHelper.get_test_dir(
128            '%s.%s' % (self.__class__.__name__, self._testMethodName))
129        self.test_dir.mkdir()
130        os.chdir(self.test_dir)
131        TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName))
132
133    def run(self, result=None):
134        start_time = time.time()
135        ret = super(TestBase, self).run(result)
136        if result.errors and result.errors[-1][0] == self:
137            status = 'FAILED'
138            err_info = result.errors[-1][1]
139        elif result.failures and result.failures[-1][0] == self:
140            status = 'FAILED'
141            err_info = result.failures[-1][1]
142        elif result.skipped and result.skipped[-1][0] == self:
143            status = 'SKIPPED'
144        else:
145            status = 'OK'
146
147        time_taken = time.time() - start_time
148        TestHelper.log(
149            'end test %s.%s %s (%.3fs)' %
150            (self.__class__.__name__, self._testMethodName, status, time_taken))
151        if status == 'FAILED':
152            TestHelper.log(err_info)
153
154        # Remove test data for passed tests to save space.
155        if status == 'OK':
156            remove(self.test_dir)
157        TestHelper.write_progress(
158            '%s.%s  %s  %.3fs' %
159            (self.__class__.__name__, self._testMethodName, status, time_taken))
160        return ret
161
162    def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str:
163        if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT:
164            args += TestHelper.browser_option
165        if TestHelper.ndk_path:
166            if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py',
167                           'report_html.py', 'annotate.py']:
168                args += ['--ndk_path', TestHelper.ndk_path]
169        if args[0].endswith('.py'):
170            args = [sys.executable, TestHelper.script_path(args[0])] + args[1:]
171        use_shell = args[0].endswith('.bat')
172        try:
173            if return_output:
174                stdout_fd = subprocess.PIPE
175                drop_output = False
176            elif drop_output:
177                stdout_fd = subprocess.DEVNULL
178            else:
179                stdout_fd = None
180
181            subproc = subprocess.Popen(args, stdout=stdout_fd,
182                                       stderr=TestHelper.log_fh, shell=use_shell)
183            stdout_data, _ = subproc.communicate()
184            output_data = bytes_to_str(stdout_data)
185            returncode = subproc.returncode
186
187        except OSError:
188            returncode = None
189        self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args)
190        if return_output:
191            return output_data
192        return ''
193
194    def check_strings_in_file(self, filename, strings: List[Union[str, re.Pattern]]):
195        self.check_exist(filename=filename)
196        with open(filename, 'r') as fh:
197            self.check_strings_in_content(fh.read(), strings)
198
199    def check_exist(self, filename=None, dirname=None):
200        if filename:
201            self.assertTrue(os.path.isfile(filename), filename)
202        if dirname:
203            self.assertTrue(os.path.isdir(dirname), dirname)
204
205    def check_strings_in_content(self, content: str, strings: List[Union[str, re.Pattern]]):
206        fulfilled = []
207        for s in strings:
208            if isinstance(s, re.Pattern):
209                fulfilled.append(s.search(content))
210            else:
211                fulfilled.append(s in content)
212        self.check_fulfilled_entries(fulfilled, strings)
213
214    def check_fulfilled_entries(self, fulfilled, entries):
215        failed_entries = []
216        for ok, entry in zip(fulfilled, entries):
217            if not ok:
218                failed_entries.append(entry)
219
220        if failed_entries:
221            self.fail('failed in below entries: %s' % (failed_entries,))
222