1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16"""A module for background python log artifacts uploading.""" 17 18import argparse 19import functools 20from importlib import resources 21import logging 22import multiprocessing 23import os 24import pathlib 25import subprocess 26import sys 27from typing import Callable 28from atest import constants 29from atest.logstorage import logstorage_utils 30from atest.metrics import metrics 31from googleapiclient import errors 32from googleapiclient import http 33 34 35_ENABLE_ATEST_LOG_UPLOADING_ENV_KEY = 'ENABLE_ATEST_LOG_UPLOADING' 36 37 38class _SimpleUploadingClient: 39 """A proxy class used to interact with the logstorage_utils module.""" 40 41 def __init__(self, atest_run_id: str): 42 self._atest_run_id = atest_run_id 43 self._client = None 44 self._client_legacy = None 45 self._invocation_id = None 46 self._workunit_id = None 47 self._legacy_test_result_id = None 48 self._invocation_data = None 49 50 def initialize_invocation(self): 51 """Initialize internal build clients and get invocation ID from AnTS.""" 52 configuration = {} 53 creds, self._invocation_data = logstorage_utils.do_upload_flow( 54 configuration, self._atest_run_id 55 ) 56 57 self._client = logstorage_utils.BuildClient(creds) 58 # Legacy test result ID is required when using AnTS' `testartifact` API 59 # to upload test artifacts due to a limitation in the API, and we need 60 # The legacy client to get the legacy ID. 61 self._client_legacy = logstorage_utils.BuildClient( 62 creds, 63 api_version=constants.STORAGE_API_VERSION_LEGACY, 64 url=constants.DISCOVERY_SERVICE_LEGACY, 65 ) 66 67 self._invocation_id = configuration[constants.INVOCATION_ID] 68 self._workunit_id = configuration[constants.WORKUNIT_ID] 69 70 self._legacy_test_result_id = ( 71 self._client_legacy.client.testresult() 72 .insert( 73 buildId=self._invocation_data['primaryBuild']['buildId'], 74 target=self._invocation_data['primaryBuild']['buildTarget'], 75 attemptId='latest', 76 body={ 77 'status': 'completePass', 78 }, 79 ) 80 .execute()['id'] 81 ) 82 83 logging.debug( 84 'Initialized AnTS invocation: http://ab/%s', self._invocation_id 85 ) 86 87 def complete_invocation(self) -> None: 88 """Set schedule state as complete to AnTS for the current invocation.""" 89 self._invocation_data['schedulerState'] = 'completed' 90 self._client.update_invocation(self._invocation_data) 91 logging.debug( 92 'Finalized AnTS invocation: http://ab/%s', self._invocation_id 93 ) 94 95 def upload_artifact( 96 self, 97 resource_id: str, 98 metadata: dict[str, str], 99 artifact_path: pathlib.Path, 100 num_of_retries, 101 ) -> None: 102 """Upload an artifact to AnTS with retries. 103 104 Args: 105 resource_id: The artifact's destination resource ID 106 metadata: The metadata for the artifact. Invocation ID and work unit ID 107 is not required in the input metadata dict as this method will add the 108 values to it. 109 artifact_path: The path of the artifact file 110 num_of_retries: Number of retries when the upload request failed 111 112 Raises: 113 errors.HttpError: When the upload failed. 114 """ 115 metadata['invocationId'] = self._invocation_id 116 metadata['workUnitId'] = self._workunit_id 117 118 self._client.client.testartifact().update( 119 resourceId=resource_id, 120 invocationId=self._invocation_id, 121 workUnitId=self._workunit_id, 122 body=metadata, 123 legacyTestResultId=self._legacy_test_result_id, 124 media_body=http.MediaFileUpload(artifact_path), 125 ).execute(num_retries=num_of_retries) 126 127 128class _LogUploadSession: 129 """A class to handle log uploading to AnTS.""" 130 131 def __init__( 132 self, atest_run_id: str, upload_client: _SimpleUploadingClient = None 133 ): 134 self._upload_client = upload_client or _SimpleUploadingClient(atest_run_id) 135 self._resource_ids = {} 136 137 def __enter__(self): 138 self._upload_client.initialize_invocation() 139 return self 140 141 def __exit__(self, exc_type, exc_val, exc_tb): 142 self._upload_client.complete_invocation() 143 144 @classmethod 145 def _get_file_paths(cls, directory: pathlib.Path) -> list[pathlib.Path]: 146 """Returns all the files under the given directory following symbolic links. 147 148 Args: 149 directory: The root directory path. 150 151 Returns: 152 A list of pathlib.Path objects representing the file paths. 153 """ 154 155 file_paths = [] 156 with os.scandir(directory) as scan: 157 for entry in scan: 158 if entry.is_file(): 159 file_paths.append(pathlib.Path(entry.path)) 160 elif entry.is_dir(): 161 file_paths.extend(cls._get_file_paths(entry)) 162 163 return file_paths 164 165 @staticmethod 166 def _create_artifact_metadata(artifact_path: pathlib.Path) -> dict[str, str]: 167 metadata = { 168 'name': artifact_path.name, 169 } 170 if artifact_path.suffix in ['.txt', '.log']: 171 metadata['artifactType'] = 'HOST_LOG' 172 metadata['contentType'] = 'text/plain' 173 return metadata 174 175 def upload_directory(self, artifacts_dir: pathlib.Path) -> None: 176 """Upload all artifacts under a directory.""" 177 logging.debug('Uploading artifact directory %s', artifacts_dir) 178 for artifact_path in self._get_file_paths(artifacts_dir): 179 self.upload_single_file(artifact_path) 180 181 def upload_single_file(self, artifact_path: pathlib.Path) -> None: 182 """Upload an single artifact.""" 183 logging.debug('Uploading artifact path %s', artifact_path) 184 file_upload_retires = 3 185 try: 186 self._upload_client.upload_artifact( 187 self._create_resource_id(artifact_path), 188 self._create_artifact_metadata(artifact_path), 189 artifact_path, 190 file_upload_retires, 191 ) 192 except errors.HttpError as e: 193 # Upload error may happen due to temporary network issue. We log down 194 # an error but do stop the upload loop so that other files may gets 195 # uploaded when the network recover. 196 logging.error('Failed to upload file %s with error: %s', artifact_path, e) 197 198 def _create_resource_id(self, artifact_path: pathlib.Path) -> str: 199 """Create a unique resource id for a file. 200 201 Args: 202 artifact_path: artifact file path 203 204 Returns: 205 A unique resource ID derived from the file name. If the file name 206 has appeared before, an extra string will be inserted between the file 207 name stem and suffix to make it unique. 208 """ 209 count = self._resource_ids.get(artifact_path.name, 0) + 1 210 self._resource_ids[artifact_path.name] = count 211 return ( 212 artifact_path.name 213 if count == 1 214 else f'{artifact_path.stem}_{count}{artifact_path.suffix}' 215 ) 216 217 218@functools.cache 219def is_uploading_logs(gcert_checker: Callable[[], bool] = None) -> bool: 220 """Determines whether log uploading is happening in the current run.""" 221 if os.environ.get(_ENABLE_ATEST_LOG_UPLOADING_ENV_KEY, 'true').lower() in [ 222 'false', 223 '0', 224 ]: 225 return False 226 227 if not logstorage_utils.is_credential_available(): 228 return False 229 230 # Checks whether gcert is available and not about to expire. 231 if gcert_checker is None: 232 gcert_checker = ( 233 lambda: subprocess.run( 234 ['which', 'gcertstatus'], 235 capture_output=True, 236 check=False, 237 ).returncode 238 == 0 239 and subprocess.run( 240 ['gcertstatus', '--check_remaining=6m'], 241 capture_output=True, 242 check=False, 243 ).returncode 244 == 0 245 ) 246 return gcert_checker() 247 248 249def upload_logs_detached(logs_dir: pathlib.Path): 250 """Upload logs to AnTS in a detached process.""" 251 if not is_uploading_logs(): 252 return 253 254 assert logs_dir, 'artifacts_dir cannot be None.' 255 assert logs_dir.as_posix(), 'The path of artifacts_dir should not be empty.' 256 257 def _start_upload_process(): 258 # We need to fock a background process instead of calling Popen with 259 # start_new_session=True because we want to make sure the atest_log_uploader 260 # resource binary is deleted after execution. 261 if os.fork() != 0: 262 return 263 with resources.as_file( 264 resources.files('atest').joinpath('atest_log_uploader') 265 ) as uploader_path: 266 # TODO: Explore whether it's possible to package the binary with 267 # executable permission. 268 os.chmod(uploader_path, 0o755) 269 270 timeout = 60 * 60 * 24 # 1 day 271 # We need to call atest_log_uploader as a binary so that the python 272 # environment can be properly loaded. 273 process = subprocess.run( 274 [uploader_path.as_posix(), logs_dir.as_posix(), metrics.get_run_id()], 275 timeout=timeout, 276 capture_output=True, 277 check=False, 278 ) 279 if process.returncode: 280 logging.error('Failed to run log upload process: %s', process) 281 282 proc = multiprocessing.Process(target=_start_upload_process) 283 proc.start() 284 proc.join() 285 286 287def _configure_logging(log_dir: str) -> None: 288 """Configure the logger.""" 289 log_fmat = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s' 290 date_fmt = '%Y-%m-%d %H:%M:%S' 291 log_path = os.path.join(log_dir, 'atest_log_uploader.log') 292 logging.getLogger('').handlers = [] 293 logging.basicConfig( 294 filename=log_path, level=logging.DEBUG, format=log_fmat, datefmt=date_fmt 295 ) 296 297 298def _redirect_stdout_stderr() -> None: 299 """Redirect stdout and stderr to logger.""" 300 301 class _StreamToLogger: 302 303 def __init__(self, logger, log_level=logging.INFO): 304 self._logger = logger 305 self._log_level = log_level 306 307 def write(self, buf): 308 self._logger.log(self._log_level, buf) 309 310 def flush(self): 311 pass 312 313 logger = logging.getLogger('') 314 sys.stdout = _StreamToLogger(logger, logging.INFO) 315 sys.stderr = _StreamToLogger(logger, logging.ERROR) 316 317 318def _main() -> None: 319 """The main method to be executed when executing this module as a binary.""" 320 arg_parser = argparse.ArgumentParser( 321 description='Internal tool for uploading test artifacts to AnTS.', 322 add_help=True, 323 ) 324 arg_parser.add_argument( 325 'artifacts_dir', help='Root directory of the test artifacts.' 326 ) 327 arg_parser.add_argument('atest_run_id', help='The Atest run ID.') 328 args = arg_parser.parse_args() 329 _configure_logging(args.artifacts_dir) 330 _redirect_stdout_stderr() 331 332 with _LogUploadSession(args.atest_run_id) as artifact_upload_session: 333 artifact_upload_session.upload_directory(pathlib.Path(args.artifacts_dir)) 334 335 336if __name__ == '__main__': 337 _main() 338