1# Copyright (C) 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16"""Utility functions for logstorage.""" 17from __future__ import print_function 18 19import logging 20import time 21import uuid 22 23from atest import atest_utils 24from atest import constants 25from atest.logstorage import atest_gcp_utils 26from atest.metrics import metrics_base 27from googleapiclient.discovery import build 28import httplib2 29from oauth2client import client as oauth2_client 30 31UPLOAD_REQUESTED_FILE_NAME = 'UPLOAD_REQUESTED' 32 33 34def is_credential_available() -> bool: 35 """Checks whether the credential needed for log upload is available.""" 36 return constants.CREDENTIAL_FILE_NAME and constants.TOKEN_FILE_PATH 37 38 39def is_upload_enabled(args: dict[str, str]) -> bool: 40 """Determines whether log upload is enabled.""" 41 if not is_credential_available() or not constants.GTF_TARGETS: 42 return False 43 44 config_folder_path = atest_utils.get_config_folder() 45 config_folder_path.mkdir(parents=True, exist_ok=True) 46 upload_requested_file = config_folder_path.joinpath( 47 UPLOAD_REQUESTED_FILE_NAME 48 ) 49 50 is_request_upload = args.get(constants.REQUEST_UPLOAD_RESULT) 51 is_disable_upload = args.get(constants.DISABLE_UPLOAD_RESULT) 52 is_previously_requested = upload_requested_file.exists() 53 54 # Note: is_request_upload and is_disable_upload are from mutually exclusive 55 # args so they won't be True simutaniously. 56 if not is_disable_upload and is_previously_requested: # Previously enabled 57 atest_utils.colorful_print( 58 'AnTS result uploading is enabled. (To disable, use' 59 ' --disable-upload-result flag)', 60 constants.GREEN, 61 ) 62 return True 63 64 if is_request_upload and not is_previously_requested: # First time enable 65 atest_utils.colorful_print( 66 'AnTS result uploading is switched on and will apply to the current and' 67 ' future TradeFed test runs. To disable it, run a test with the' 68 ' --disable-upload-result flag.', 69 constants.GREEN, 70 ) 71 upload_requested_file.touch() 72 return True 73 74 if is_disable_upload and is_previously_requested: # First time disable 75 atest_utils.colorful_print( 76 'AnTS result uploading is switched off and will apply to the current' 77 ' and future TradeFed test runs. To re-enable it, run a test with the' 78 ' --request-upload-result flag.', 79 constants.GREEN, 80 ) 81 upload_requested_file.unlink() 82 config_folder_path.joinpath(constants.CREDENTIAL_FILE_NAME).unlink( 83 missing_ok=True 84 ) 85 return False 86 87 return False 88 89 90def do_upload_flow( 91 extra_args: dict[str, str], atest_run_id: str = None 92) -> tuple: 93 """Run upload flow. 94 95 Asking user's decision and do the related steps. 96 97 Args: 98 extra_args: Dict of extra args to add to test run. 99 atest_run_id: The atest run ID to write into the invocation. 100 101 Return: 102 A tuple of credential object and invocation information dict. 103 """ 104 return atest_gcp_utils.do_upload_flow( 105 extra_args, lambda cred: BuildClient(cred), atest_run_id 106 ) 107 108 109class BuildClient: 110 """Build api helper class.""" 111 112 def __init__( 113 self, 114 creds, 115 api_version=constants.STORAGE_API_VERSION, 116 url=constants.DISCOVERY_SERVICE, 117 ): 118 """Init BuildClient class. 119 120 Args: 121 creds: An oauth2client.OAuth2Credentials instance. 122 """ 123 http_auth = creds.authorize(httplib2.Http()) 124 self.client = build( 125 serviceName=constants.STORAGE_SERVICE_NAME, 126 version=api_version, 127 cache_discovery=False, 128 http=http_auth, 129 discoveryServiceUrl=url, 130 ) 131 132 def list_branch(self): 133 """List all branch.""" 134 return self.client.branch().list(maxResults=10000).execute() 135 136 def list_target(self, branch): 137 """List all target in the branch.""" 138 return self.client.target().list(branch=branch, maxResults=10000).execute() 139 140 def get_branch(self, branch): 141 """Get BuildInfo for specific branch. 142 143 Args: 144 branch: A string of branch name to query. 145 """ 146 query_branch = '' 147 try: 148 query_branch = self.client.branch().get(resourceId=branch).execute() 149 # pylint: disable=broad-except 150 except Exception: 151 return '' 152 return query_branch 153 154 def insert_local_build(self, external_id, target, branch): 155 """Insert a build record. 156 157 Args: 158 external_id: unique id of build record. 159 target: build target. 160 branch: build branch. 161 162 Returns: 163 A build record object. 164 """ 165 body = { 166 'buildId': '', 167 'externalId': external_id, 168 'branch': branch, 169 'target': {'name': target, 'target': target}, 170 'buildAttemptStatus': 'complete', 171 } 172 return self.client.build().insert(buildType='local', body=body).execute() 173 174 def insert_build_attempts(self, build_record): 175 """Insert a build attempt record. 176 177 Args: 178 build_record: build record. 179 180 Returns: 181 A build attempt object. 182 """ 183 build_attempt = {'id': 0, 'status': 'complete', 'successful': True} 184 return ( 185 self.client.buildattempt() 186 .insert( 187 buildId=build_record['buildId'], 188 target=build_record['target']['name'], 189 body=build_attempt, 190 ) 191 .execute() 192 ) 193 194 def insert_invocation(self, build_record, atest_run_id: str): 195 """Insert a build invocation record. 196 197 Args: 198 build_record: build record. 199 atest_run_id: The atest run ID to write into the invocation. 200 201 Returns: 202 A build invocation object. 203 """ 204 sponge_invocation_id = str(uuid.uuid4()) 205 user_email = metrics_base.get_user_email() 206 invocation = { 207 'primaryBuild': { 208 'buildId': build_record['buildId'], 209 'buildTarget': build_record['target']['name'], 210 'branch': build_record['branch'], 211 }, 212 'schedulerState': 'running', 213 'runner': 'atest', 214 'scheduler': 'atest', 215 'users': [user_email], 216 'properties': [ 217 { 218 'name': 'sponge_invocation_id', 219 'value': sponge_invocation_id, 220 }, 221 { 222 'name': 'test_uri', 223 'value': f'{constants.STORAGE2_TEST_URI}{sponge_invocation_id}', 224 }, 225 {'name': 'atest_run_id', 'value': atest_run_id}, 226 ], 227 } 228 return self.client.invocation().insert(body=invocation).execute() 229 230 def update_invocation(self, invocation): 231 """Insert a build invocation record. 232 233 Args: 234 invocation: invocation record. 235 236 Returns: 237 A invocation object. 238 """ 239 # Because invocation revision will be update by TF, we need to fetch 240 # latest invocation revision to update status correctly. 241 count = 0 242 invocations = None 243 while count < 5: 244 invocations = ( 245 self.client.invocation() 246 .list(invocationId=invocation['invocationId'], maxResults=10) 247 .execute() 248 .get('invocations', []) 249 ) 250 if invocations: 251 break 252 time.sleep(0.5) 253 count = count + 1 254 if invocations: 255 latest_revision = invocations[-1].get('revision', '') 256 if latest_revision: 257 logging.debug( 258 'Get latest_revision:%s from invocations:%s', 259 latest_revision, 260 invocations, 261 ) 262 invocation['revision'] = latest_revision 263 return ( 264 self.client.invocation() 265 .update(resourceId=invocation['invocationId'], body=invocation) 266 .execute() 267 ) 268 269 def insert_work_unit(self, invocation_record): 270 """Insert a workunit record. 271 272 Args: 273 invocation_record: invocation record. 274 275 Returns: 276 the workunit object. 277 """ 278 workunit = {'invocationId': invocation_record['invocationId']} 279 return self.client.workunit().insert(body=workunit).execute() 280