1#!/usr/bin/env python3 2 3# Copyright (C) 2024 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Resultstore client for Mobly tests.""" 18 19import datetime 20import enum 21import logging 22import posixpath 23import urllib.parse 24import uuid 25 26from google.auth import credentials 27import google_auth_httplib2 28from googleapiclient import discovery 29import httplib2 30 31_DEFAULT_CONFIGURATION = 'default' 32_RESULTSTORE_BASE_LINK = 'https://btx.cloud.google.com' 33 34 35class Status(enum.Enum): 36 """Aggregate status of the Resultstore invocation and target.""" 37 PASSED = 'PASSED' 38 FAILED = 'FAILED' 39 SKIPPED = 'SKIPPED' 40 UNKNOWN = 'UNKNOWN' 41 42 43class StatusCode(enum.IntEnum): 44 """Test case statuses and their associated code in Resultstore. 45 46 Used to toggle the visibility of test cases with a particular status. 47 """ 48 ERRORED = 1 49 TIMED_OUT = 2 50 FAILED = 3 51 FLAKY = 4 52 PASSED = 5 53 54 55class ResultstoreClient: 56 """Resultstore client for Mobly tests.""" 57 58 def __init__( 59 self, 60 service: discovery.Resource, 61 creds: credentials.Credentials, 62 project_id: str, 63 ): 64 """Creates a ResultstoreClient. 65 66 Args: 67 service: discovery.Resource object for interacting with the API. 68 creds: credentials to add to HTTP request. 69 project_id: GCP project ID for Resultstore. 70 """ 71 self._service = service 72 self._http = google_auth_httplib2.AuthorizedHttp( 73 creds, http=httplib2.Http(timeout=30) 74 ) 75 self._project_id = project_id 76 77 self._request_id = '' 78 self._invocation_id = '' 79 self._authorization_token = '' 80 self._target_id = '' 81 self._encoded_target_id = '' 82 83 self._status = Status.UNKNOWN 84 85 @property 86 def _invocation_name(self): 87 """The resource name for the invocation.""" 88 if not self._invocation_id: 89 return '' 90 return f'invocations/{self._invocation_id}' 91 92 @property 93 def _target_name(self): 94 """The resource name for the target.""" 95 if not (self._invocation_name or self._encoded_target_id): 96 return '' 97 return f'{self._invocation_name}/targets/{self._encoded_target_id}' 98 99 @property 100 def _configured_target_name(self): 101 """The resource name for the configured target.""" 102 if not self._target_name: 103 return 104 return f'{self._target_name}/configuredTargets/{_DEFAULT_CONFIGURATION}' 105 106 def set_status(self, status: Status) -> None: 107 """Sets the overall test run status.""" 108 self._status = status 109 110 def create_invocation(self) -> str: 111 """Creates an invocation. 112 113 Returns: 114 The invocation ID. 115 """ 116 logging.debug('creating invocation...') 117 if self._invocation_id: 118 logging.warning( 119 'invocation %s already exists, skipping creation...', 120 self._invocation_id, 121 ) 122 return None 123 invocation = { 124 'timing': { 125 'startTime': datetime.datetime.utcnow().isoformat() + 'Z'}, 126 'invocationAttributes': {'projectId': self._project_id}, 127 } 128 self._request_id = str(uuid.uuid4()) 129 self._invocation_id = str(uuid.uuid4()) 130 self._authorization_token = str(uuid.uuid4()) 131 request = self._service.invocations().create( 132 body=invocation, 133 requestId=self._request_id, 134 invocationId=self._invocation_id, 135 authorizationToken=self._authorization_token, 136 ) 137 res = request.execute(http=self._http) 138 logging.debug('invocations.create: %s', res) 139 return self._invocation_id 140 141 def create_default_configuration(self) -> None: 142 """Creates a default configuration.""" 143 logging.debug('creating default configuration...') 144 configuration = { 145 'id': { 146 'invocationId': self._invocation_id, 147 'configurationId': _DEFAULT_CONFIGURATION, 148 } 149 } 150 request = ( 151 self._service.invocations() 152 .configs() 153 .create( 154 body=configuration, 155 parent=f'invocations/{self._invocation_id}', 156 configId=_DEFAULT_CONFIGURATION, 157 authorizationToken=self._authorization_token, 158 ) 159 ) 160 res = request.execute(http=self._http) 161 logging.debug('invocations.configs.create: %s', res) 162 163 def create_target(self, target_id: str | None = None) -> str: 164 """Creates a target. 165 166 Args: 167 target_id: An optional custom target ID. 168 169 Returns: 170 The target ID. 171 """ 172 logging.debug('creating target in %s...', self._invocation_name) 173 if self._target_id: 174 logging.warning( 175 'target %s already exists, skipping creation...', 176 self._target_id, 177 ) 178 return 179 self._target_id = target_id or str(uuid.uuid4()) 180 self._encoded_target_id = urllib.parse.quote(self._target_id, safe='') 181 target = { 182 'id': { 183 'invocationId': self._invocation_id, 184 'targetId': self._target_id, 185 }, 186 'targetAttributes': {'type': 'TEST', 'language': 'PY'}, 187 } 188 request = ( 189 self._service.invocations() 190 .targets() 191 .create( 192 body=target, 193 parent=self._invocation_name, 194 targetId=self._target_id, 195 authorizationToken=self._authorization_token, 196 ) 197 ) 198 res = request.execute(http=self._http) 199 logging.debug('invocations.targets.create: %s', res) 200 return self._target_id 201 202 def create_configured_target(self) -> None: 203 """Creates a configured target.""" 204 logging.debug('creating configured target in %s...', self._target_name) 205 configured_target = { 206 'id': { 207 'invocationId': self._invocation_id, 208 'targetId': self._target_id, 209 'configurationId': _DEFAULT_CONFIGURATION, 210 }, 211 } 212 request = ( 213 self._service.invocations() 214 .targets() 215 .configuredTargets() 216 .create( 217 body=configured_target, 218 parent=self._target_name, 219 configId=_DEFAULT_CONFIGURATION, 220 authorizationToken=self._authorization_token, 221 ) 222 ) 223 res = request.execute(http=self._http) 224 logging.debug('invocations.targets.configuredTargets.create: %s', res) 225 226 def create_action(self, gcs_path: str, artifacts: list[str]) -> str: 227 """Creates an action. 228 229 Args: 230 gcs_path: The directory in GCS where artifacts are stored. 231 artifacts: List of paths (relative to gcs_path) to the test artifacts. 232 233 Returns: 234 The action ID. 235 """ 236 logging.debug('creating action in %s...', self._configured_target_name) 237 action_id = str(uuid.uuid4()) 238 files = [ 239 {'uid': path, 'uri': posixpath.join(gcs_path, path)} 240 for path in artifacts 241 ] 242 action = { 243 'id': { 244 'invocationId': self._invocation_id, 245 'targetId': self._target_id, 246 'configurationId': _DEFAULT_CONFIGURATION, 247 'actionId': action_id, 248 }, 249 'testAction': {}, 250 'files': files, 251 } 252 request = ( 253 self._service.invocations() 254 .targets() 255 .configuredTargets() 256 .actions() 257 .create( 258 body=action, 259 parent=self._configured_target_name, 260 actionId=action_id, 261 authorizationToken=self._authorization_token, 262 ) 263 ) 264 res = request.execute(http=self._http) 265 logging.debug( 266 'invocations.targets.configuredTargets.actions.create: %s', res 267 ) 268 return action_id 269 270 def merge_configured_target(self): 271 """Merges a configured target.""" 272 logging.debug('merging configured target %s...', 273 self._configured_target_name) 274 merge_request = { 275 'configuredTarget': { 276 'statusAttributes': {'status': self._status.value}, 277 }, 278 'authorizationToken': self._authorization_token, 279 'updateMask': 'statusAttributes', 280 } 281 request = ( 282 self._service.invocations() 283 .targets() 284 .configuredTargets() 285 .merge( 286 body=merge_request, 287 name=self._configured_target_name, 288 ) 289 ) 290 res = request.execute(http=self._http) 291 logging.debug('invocations.targets.configuredTargets.merge: %s', res) 292 293 def finalize_configured_target(self): 294 """Finalizes a configured target.""" 295 logging.debug('finalizing configured target %s...', 296 self._configured_target_name) 297 finalize_request = { 298 'authorizationToken': self._authorization_token, 299 } 300 request = ( 301 self._service.invocations() 302 .targets() 303 .configuredTargets() 304 .finalize( 305 body=finalize_request, 306 name=self._configured_target_name, 307 ) 308 ) 309 res = request.execute(http=self._http) 310 logging.debug('invocations.targets.configuredTargets.finalize: %s', res) 311 312 def merge_target(self): 313 """Merges a target.""" 314 logging.debug('merging target %s...', self._target_name) 315 merge_request = { 316 'target': { 317 'statusAttributes': {'status': self._status.value}, 318 }, 319 'authorizationToken': self._authorization_token, 320 'updateMask': 'statusAttributes', 321 } 322 request = ( 323 self._service.invocations() 324 .targets() 325 .merge( 326 body=merge_request, 327 name=self._target_name, 328 ) 329 ) 330 res = request.execute(http=self._http) 331 logging.debug('invocations.targets.merge: %s', res) 332 333 def finalize_target(self): 334 """Finalizes a target.""" 335 logging.debug('finalizing target %s...', self._target_name) 336 finalize_request = { 337 'authorizationToken': self._authorization_token, 338 } 339 request = ( 340 self._service.invocations() 341 .targets() 342 .finalize( 343 body=finalize_request, 344 name=self._target_name, 345 ) 346 ) 347 res = request.execute(http=self._http) 348 logging.debug('invocations.targets.finalize: %s', res) 349 350 def merge_invocation(self): 351 """Merges an invocation.""" 352 logging.debug('merging invocation %s...', self._invocation_name) 353 merge_request = { 354 'invocation': {'statusAttributes': {'status': self._status.value}}, 355 'updateMask': 'statusAttributes', 356 'authorizationToken': self._authorization_token, 357 } 358 request = self._service.invocations().merge(body=merge_request, 359 name=self._invocation_name) 360 res = request.execute(http=self._http) 361 logging.debug('invocations.merge: %s', res) 362 363 def finalize_invocation(self): 364 """Finalizes an invocation.""" 365 logging.debug('finalizing invocation %s...', self._invocation_name) 366 finalize_request = { 367 'authorizationToken': self._authorization_token, 368 } 369 request = self._service.invocations().finalize( 370 body=finalize_request, name=self._invocation_name 371 ) 372 res = request.execute(http=self._http) 373 logging.debug('invocations.finalize: %s', res) 374 print('---------------------') 375 # Make the URL show test cases regardless of status by default. 376 show_statuses = ( 377 'showStatuses=' 378 f'{",".join(str(status_code) for status_code in StatusCode)}' 379 ) 380 print( 381 f'See results in {_RESULTSTORE_BASE_LINK}/' 382 f'{self._target_name};config={_DEFAULT_CONFIGURATION}/tests;' 383 f'{show_statuses}' 384 ) 385 self._request_id = '' 386 self._invocation_id = '' 387 self._authorization_token = '' 388 self._target_id = '' 389 self._encoded_target_id = '' 390