1#!/usr/bin/env python3
2#
3# Copyright 2022, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Integration tests for the Atest Bazel mode feature."""
18
19# pylint: disable=invalid-name
20# pylint: disable=missing-class-docstring
21# pylint: disable=missing-function-docstring
22
23import json
24import os
25from pathlib import Path
26import re
27import shutil
28import subprocess
29import tempfile
30from typing import Any, Dict
31import unittest
32
33
34class ResultCompareTest(unittest.TestCase):
35
36  def setUp(self):
37    self.src_root_path = Path(os.environ['ANDROID_BUILD_TOP'])
38    self.out_dir_path = Path(tempfile.mkdtemp())
39    self.test_env = self.setup_test_env()
40
41  def tearDown(self):
42    shutil.rmtree(self.out_dir_path)
43
44  def test_standard_mode_and_bazel_mode_result_equal(self):
45    standard_mode_result = self.get_test_result(
46        shell_cmd='atest -c -m --host --host-unit-test-only'
47    )
48
49    bazel_mode_result = self.get_test_result(
50        shell_cmd=(
51            'atest -c --bazel-mode --host --host-unit-test-only '
52            '--bazel-arg=--test_timeout=300'
53        ),
54        is_bazel_mode=True,
55    )
56
57    self.assert_test_result_equal(standard_mode_result, bazel_mode_result)
58
59  def setup_test_env(self) -> Dict[str, Any]:
60    test_env = {
61        'PATH': os.environ['PATH'],
62        'HOME': os.environ['HOME'],
63        'OUT_DIR': str(self.out_dir_path),
64    }
65    return test_env
66
67  def get_test_result(
68      self,
69      shell_cmd: str,
70      is_bazel_mode: bool = False,
71  ) -> Dict[str, str]:
72    result_file_name = 'test_result'
73    if is_bazel_mode:
74      shell_cmd = (
75          f'{shell_cmd} --bazel-arg=--build_event_json_file={result_file_name}'
76      )
77
78    completed_process = self.run_shell_command(shell_cmd)
79    result_file_path = self.get_result_file_path(
80        completed_process, result_file_name, is_bazel_mode
81    )
82
83    if is_bazel_mode:
84      return parse_bazel_result(result_file_path)
85    return parse_standard_result(result_file_path)
86
87  def get_result_file_path(
88      self,
89      completed_process: subprocess.CompletedProcess,
90      result_file_name: str,
91      is_bazel_mode: bool = False,
92  ) -> Path:
93    if is_bazel_mode:
94      return self.out_dir_path.joinpath(
95          'atest_bazel_workspace', result_file_name
96      )
97
98    result_file_path = None
99    for line in completed_process.stdout.decode().splitlines():
100      if line.startswith('Test Logs have been saved in '):
101        result_file_path = Path(
102            re.sub('Test Logs have been saved in ', '', line).replace(
103                'log', result_file_name
104            )
105        )
106        break
107
108    if not result_file_path:
109      raise Exception('Could not find test result filepath')
110
111    return result_file_path
112
113  def run_shell_command(
114      self,
115      shell_command: str,
116  ) -> subprocess.CompletedProcess:
117    return subprocess.run(
118        '. build/envsetup.sh && '
119        'lunch aosp_cf_x86_64_pc-userdebug && '
120        f'{shell_command}',
121        env=self.test_env,
122        cwd=self.src_root_path,
123        shell=True,
124        check=False,
125        stderr=subprocess.STDOUT,
126        stdout=subprocess.PIPE,
127    )
128
129  def assert_test_result_equal(self, result1, result2):
130    self.assertEqual(set(result1.keys()), set(result2.keys()))
131
132    print(
133        '{0:100}  {1:20}  {2}'.format(
134            'Test', 'Atest Standard Mode', 'Atest Bazel Mode'
135        )
136    )
137    count = 0
138    for k, v in result1.items():
139      if v != result2[k]:
140        count += 1
141        print('{0:100}  {1:20}  {2}'.format(k, v, result2[k]))
142    print(
143        f'Total Number of Host Unit Test: {len(result1)}. {count} tests '
144        'have different results.'
145    )
146
147    self.assertEqual(count, 0)
148
149
150def parse_standard_result(result_file: Path) -> Dict[str, str]:
151  result = {}
152  with result_file.open('r') as f:
153    json_result = json.loads(f.read())
154    for k, v in json_result['test_runner']['AtestTradefedTestRunner'].items():
155      name = k.split()[-1]
156      if name in result:
157        raise Exception(f'Duplicated Test Target: `{name}`')
158
159      # Test passed when there are no failed test cases and no errors.
160      result[name] = (
161          'PASSED'
162          if v['summary']['FAILED'] == 0 and not v.get('ERROR')
163          else 'FAILED'
164      )
165  return result
166
167
168def parse_bazel_result(result_file: Path) -> Dict[str, str]:
169  result = {}
170  with result_file.open('r') as f:
171    content = f.read()
172    events = content.splitlines()
173
174    for e in events:
175      json_event = json.loads(e)
176      if 'testSummary' in json_event['id']:
177        name = (
178            json_event['id']['testSummary']['label']
179            .split(':')[-1]
180            .removesuffix('_host')
181        )
182        result[name] = json_event['testSummary']['overallStatus']
183  return result
184
185
186if __name__ == '__main__':
187  unittest.main(verbosity=2)
188