1#!/usr/bin/env python3
2
3#  Copyright (C) 2024 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16
17"""CLI uploader for Mobly test results to Resultstore."""
18
19import argparse
20import dataclasses
21import datetime
22from importlib import resources
23import logging
24import mimetypes
25import pathlib
26import platform
27import shutil
28import tempfile
29import warnings
30from xml.etree import ElementTree
31
32import google.auth
33from google.cloud import storage
34from googleapiclient import discovery
35
36import mobly_result_converter
37import resultstore_client
38
39with warnings.catch_warnings():
40    warnings.simplefilter('ignore')
41    from google.cloud.storage import transfer_manager
42
43logging.getLogger('googleapiclient').setLevel(logging.WARNING)
44
45_RESULTSTORE_SERVICE_NAME = 'resultstore'
46_API_VERSION = 'v2'
47_DISCOVERY_SERVICE_URL = (
48    'https://{api}.googleapis.com/$discovery/rest?version={apiVersion}'
49)
50_TEST_XML = 'test.xml'
51_TEST_LOGS = 'test.log'
52_UNDECLARED_OUTPUTS = 'undeclared_outputs/'
53
54_TEST_SUMMARY_YAML = 'test_summary.yaml'
55_TEST_LOG_INFO = 'test_log.INFO'
56
57_SUITE_NAME = 'suite_name'
58_RUN_IDENTIFIER = 'run_identifier'
59
60_GCS_BASE_LINK = 'https://console.cloud.google.com/storage/browser'
61
62_GCS_UPLOAD_INSTRUCTIONS = (
63    '\nAutomatic upload to GCS failed.\n'
64    'Please follow the steps below to manually upload files:\n'
65    f'\t1. Follow the link {_GCS_BASE_LINK}/%s.\n'
66    '\t2. Click "UPLOAD FOLDER".\n'
67    '\t3. Select the directory "%s" to upload.'
68)
69
70_ResultstoreTreeTags = mobly_result_converter.ResultstoreTreeTags
71_ResultstoreTreeAttributes = mobly_result_converter.ResultstoreTreeAttributes
72
73_Status = resultstore_client.Status
74
75
76@dataclasses.dataclass()
77class _TestResultInfo:
78    """Info from the parsed test summary used for the Resultstore invocation."""
79
80    # Aggregate status of the overall test run.
81    status: _Status = _Status.UNKNOWN
82    # Target ID for the test.
83    target_id: str | None = None
84
85
86def _convert_results(
87        mobly_dir: pathlib.Path, dest_dir: pathlib.Path) -> _TestResultInfo:
88    """Converts Mobly test results into Resultstore artifacts."""
89    test_result_info = _TestResultInfo()
90    logging.info('Converting raw Mobly logs into Resultstore artifacts...')
91    # Generate the test.xml
92    mobly_yaml_path = mobly_dir.joinpath(_TEST_SUMMARY_YAML)
93    if mobly_yaml_path.is_file():
94        test_xml = mobly_result_converter.convert(mobly_yaml_path, mobly_dir)
95        ElementTree.indent(test_xml)
96        test_xml.write(
97            str(dest_dir.joinpath(_TEST_XML)),
98            encoding='utf-8',
99            xml_declaration=True,
100        )
101        test_result_info = _get_test_result_info_from_test_xml(test_xml)
102
103    # Copy test_log.INFO to test.log
104    test_log_info = mobly_dir.joinpath(_TEST_LOG_INFO)
105    if test_log_info.is_file():
106        shutil.copyfile(test_log_info, dest_dir.joinpath(_TEST_LOGS))
107
108    # Copy directory to undeclared_outputs/
109    shutil.copytree(
110        mobly_dir,
111        dest_dir.joinpath(_UNDECLARED_OUTPUTS),
112        dirs_exist_ok=True,
113    )
114    return test_result_info
115
116
117def _get_test_result_info_from_test_xml(
118        test_xml: ElementTree.ElementTree,
119) -> _TestResultInfo:
120    """Parses a test_xml element into a _TestResultInfo."""
121    test_result_info = _TestResultInfo()
122    mobly_suite_element = test_xml.getroot().find(
123        f'./{_ResultstoreTreeTags.TESTSUITE.value}'
124    )
125    if mobly_suite_element is None:
126        return test_result_info
127    # Set aggregate test status
128    test_result_info.status = _Status.PASSED
129    test_class_elements = mobly_suite_element.findall(
130        f'./{_ResultstoreTreeTags.TESTSUITE.value}')
131    failures = int(
132        mobly_suite_element.get(_ResultstoreTreeAttributes.FAILURES.value)
133    )
134    errors = int(
135        mobly_suite_element.get(_ResultstoreTreeAttributes.ERRORS.value))
136    if failures or errors:
137        test_result_info.status = _Status.FAILED
138    else:
139        all_skipped = all([test_case_element.get(
140            _ResultstoreTreeAttributes.RESULT.value) == 'skipped' for
141                           test_class_element in test_class_elements for
142                           test_case_element in test_class_element.findall(
143                f'./{_ResultstoreTreeTags.TESTCASE.value}')])
144        if all_skipped:
145            test_result_info.status = _Status.SKIPPED
146
147    # Set target ID based on test class names, suite name, and custom run
148    # identifier.
149    suite_name_value = None
150    run_identifier_value = None
151    properties_element = mobly_suite_element.find(
152        f'./{_ResultstoreTreeTags.PROPERTIES.value}'
153    )
154    if properties_element is not None:
155        suite_name = properties_element.find(
156            f'./{_ResultstoreTreeTags.PROPERTY.value}'
157            f'[@{_ResultstoreTreeAttributes.NAME.value}="{_SUITE_NAME}"]'
158        )
159        if suite_name is not None:
160            suite_name_value = suite_name.get(
161                _ResultstoreTreeAttributes.VALUE.value
162            )
163        run_identifier = properties_element.find(
164            f'./{_ResultstoreTreeTags.PROPERTY.value}'
165            f'[@{_ResultstoreTreeAttributes.NAME.value}="{_RUN_IDENTIFIER}"]'
166        )
167        if run_identifier is not None:
168            run_identifier_value = run_identifier.get(
169                _ResultstoreTreeAttributes.VALUE.value
170            )
171    if suite_name_value:
172        target_id = suite_name_value
173    else:
174        test_class_names = [
175            test_class_element.get(_ResultstoreTreeAttributes.NAME.value)
176            for test_class_element in test_class_elements
177        ]
178        target_id = '+'.join(test_class_names)
179    if run_identifier_value:
180        target_id = f'{target_id} {run_identifier_value}'
181
182    test_result_info.target_id = target_id
183    return test_result_info
184
185
186def _upload_dir_to_gcs(
187        src_dir: pathlib.Path, gcs_bucket: str, gcs_dir: str
188) -> list[str]:
189    """Uploads the given directory to a GCS bucket."""
190    # Set correct MIME types for certain text-format files.
191    with resources.as_file(
192            resources.files('data').joinpath('mime.types')) as path:
193        mimetypes.init([path])
194
195    bucket_obj = storage.Client().bucket(gcs_bucket)
196
197    glob = src_dir.rglob('*')
198    file_paths = [
199        str(path.relative_to(src_dir).as_posix())
200        for path in glob
201        if path.is_file()
202    ]
203
204    logging.info(
205        'Uploading %s files from %s to Cloud Storage directory %s/%s...',
206        len(file_paths),
207        str(src_dir),
208        gcs_bucket,
209        gcs_dir,
210    )
211    # Ensure that the destination directory has a trailing '/'.
212    blob_name_prefix = gcs_dir
213    if blob_name_prefix and not blob_name_prefix.endswith('/'):
214        blob_name_prefix += '/'
215
216    # If running on Windows, disable multiprocessing for upload.
217    worker_type = (
218        transfer_manager.THREAD
219        if platform.system() == 'Windows'
220        else transfer_manager.PROCESS
221    )
222    results = transfer_manager.upload_many_from_filenames(
223        bucket_obj,
224        file_paths,
225        source_directory=str(src_dir),
226        blob_name_prefix=blob_name_prefix,
227        worker_type=worker_type,
228    )
229
230    success_paths = []
231    for file_name, result in zip(file_paths, results):
232        if isinstance(result, Exception):
233            logging.warning('Failed to upload %s. Error: %s', file_name, result)
234        else:
235            logging.debug('Uploaded %s.', file_name)
236            success_paths.append(file_name)
237
238    # If all files fail to upload, something wrong happened with the GCS client.
239    # Prompt the user to manually upload the files instead.
240    if file_paths and not success_paths:
241        _prompt_user_upload(src_dir, gcs_bucket)
242        success_paths = file_paths
243
244    return success_paths
245
246
247def _prompt_user_upload(src_dir: pathlib.Path, gcs_bucket: str) -> None:
248    """Prompts the user to manually upload files to GCS."""
249    print(_GCS_UPLOAD_INSTRUCTIONS % (gcs_bucket, src_dir))
250    while True:
251        resp = input(
252            'Once you see the message "# files successfully uploaded", '
253            'enter "Y" or "yes" to continue:')
254        if resp.lower() in ('y', 'yes'):
255            break
256
257
258def _upload_to_resultstore(
259        gcs_bucket: str,
260        gcs_dir: str,
261        file_paths: list[str],
262        status: _Status,
263        target_id: str | None,
264) -> None:
265    """Uploads test results to Resultstore."""
266    logging.info('Generating Resultstore link...')
267    service = discovery.build(
268        _RESULTSTORE_SERVICE_NAME,
269        _API_VERSION,
270        discoveryServiceUrl=_DISCOVERY_SERVICE_URL,
271    )
272    creds, project_id = google.auth.default()
273    client = resultstore_client.ResultstoreClient(service, creds, project_id)
274    client.create_invocation()
275    client.create_default_configuration()
276    client.create_target(target_id)
277    client.create_configured_target()
278    client.create_action(f'gs://{gcs_bucket}/{gcs_dir}', file_paths)
279    client.set_status(status)
280    client.merge_configured_target()
281    client.finalize_configured_target()
282    client.merge_target()
283    client.finalize_target()
284    client.merge_invocation()
285    client.finalize_invocation()
286
287
288def main():
289    parser = argparse.ArgumentParser()
290    parser.add_argument(
291        '-v', '--verbose', action='store_true', help='Enable debug logs.'
292    )
293    parser.add_argument(
294        'mobly_dir',
295        help='Directory on host where Mobly results are stored.',
296    )
297    parser.add_argument(
298        '--gcs_bucket',
299        help='Bucket in GCS where test artifacts are uploaded. If unspecified, '
300             'use the current GCP project name as the bucket name.',
301    )
302    parser.add_argument(
303        '--gcs_dir',
304        help=(
305            'Directory to save test artifacts in GCS. Specify empty string to '
306            'store the files in the bucket root. If unspecified, use the '
307            'current timestamp as the GCS directory name.'
308        ),
309    )
310    parser.add_argument(
311        '--test_title',
312        help='Custom test title to display in the result UI.'
313    )
314
315    args = parser.parse_args()
316    logging.basicConfig(
317        format='%(levelname)s: %(message)s',
318        level=(logging.DEBUG if args.verbose else logging.INFO)
319    )
320    _, project_id = google.auth.default()
321    gcs_bucket = project_id if args.gcs_bucket is None else args.gcs_bucket
322    gcs_dir = (
323        datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
324        if args.gcs_dir is None
325        else args.gcs_dir
326    )
327    with tempfile.TemporaryDirectory() as tmp:
328        converted_dir = pathlib.Path(tmp).joinpath(gcs_dir)
329        converted_dir.mkdir()
330        mobly_dir = pathlib.Path(args.mobly_dir).absolute().expanduser()
331        test_result_info = _convert_results(mobly_dir, converted_dir)
332        gcs_files = _upload_dir_to_gcs(
333            converted_dir, gcs_bucket, gcs_dir)
334    _upload_to_resultstore(
335        gcs_bucket,
336        gcs_dir,
337        gcs_files,
338        test_result_info.status,
339        args.test_title or test_result_info.target_id,
340    )
341
342
343if __name__ == '__main__':
344    main()
345