1#!/usr/bin/env python3
2
3#  Copyright (C) 2024 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16
17"""Resultstore client for Mobly tests."""
18
19import datetime
20import enum
21import logging
22import posixpath
23import urllib.parse
24import uuid
25
26from google.auth import credentials
27import google_auth_httplib2
28from googleapiclient import discovery
29import httplib2
30
31_DEFAULT_CONFIGURATION = 'default'
32_RESULTSTORE_BASE_LINK = 'https://btx.cloud.google.com'
33
34
35class Status(enum.Enum):
36    """Aggregate status of the Resultstore invocation and target."""
37    PASSED = 'PASSED'
38    FAILED = 'FAILED'
39    SKIPPED = 'SKIPPED'
40    UNKNOWN = 'UNKNOWN'
41
42
43class StatusCode(enum.IntEnum):
44    """Test case statuses and their associated code in Resultstore.
45
46    Used to toggle the visibility of test cases with a particular status.
47    """
48    ERRORED = 1
49    TIMED_OUT = 2
50    FAILED = 3
51    FLAKY = 4
52    PASSED = 5
53
54
55class ResultstoreClient:
56    """Resultstore client for Mobly tests."""
57
58    def __init__(
59            self,
60            service: discovery.Resource,
61            creds: credentials.Credentials,
62            project_id: str,
63    ):
64        """Creates a ResultstoreClient.
65
66        Args:
67          service: discovery.Resource object for interacting with the API.
68          creds: credentials to add to HTTP request.
69          project_id: GCP project ID for Resultstore.
70        """
71        self._service = service
72        self._http = google_auth_httplib2.AuthorizedHttp(
73            creds, http=httplib2.Http(timeout=30)
74        )
75        self._project_id = project_id
76
77        self._request_id = ''
78        self._invocation_id = ''
79        self._authorization_token = ''
80        self._target_id = ''
81        self._encoded_target_id = ''
82
83        self._status = Status.UNKNOWN
84
85    @property
86    def _invocation_name(self):
87        """The resource name for the invocation."""
88        if not self._invocation_id:
89            return ''
90        return f'invocations/{self._invocation_id}'
91
92    @property
93    def _target_name(self):
94        """The resource name for the target."""
95        if not (self._invocation_name or self._encoded_target_id):
96            return ''
97        return f'{self._invocation_name}/targets/{self._encoded_target_id}'
98
99    @property
100    def _configured_target_name(self):
101        """The resource name for the configured target."""
102        if not self._target_name:
103            return
104        return f'{self._target_name}/configuredTargets/{_DEFAULT_CONFIGURATION}'
105
106    def set_status(self, status: Status) -> None:
107        """Sets the overall test run status."""
108        self._status = status
109
110    def create_invocation(self) -> str:
111        """Creates an invocation.
112
113        Returns:
114          The invocation ID.
115        """
116        logging.debug('creating invocation...')
117        if self._invocation_id:
118            logging.warning(
119                'invocation %s already exists, skipping creation...',
120                self._invocation_id,
121            )
122            return None
123        invocation = {
124            'timing': {
125                'startTime': datetime.datetime.utcnow().isoformat() + 'Z'},
126            'invocationAttributes': {'projectId': self._project_id},
127        }
128        self._request_id = str(uuid.uuid4())
129        self._invocation_id = str(uuid.uuid4())
130        self._authorization_token = str(uuid.uuid4())
131        request = self._service.invocations().create(
132            body=invocation,
133            requestId=self._request_id,
134            invocationId=self._invocation_id,
135            authorizationToken=self._authorization_token,
136        )
137        res = request.execute(http=self._http)
138        logging.debug('invocations.create: %s', res)
139        return self._invocation_id
140
141    def create_default_configuration(self) -> None:
142        """Creates a default configuration."""
143        logging.debug('creating default configuration...')
144        configuration = {
145            'id': {
146                'invocationId': self._invocation_id,
147                'configurationId': _DEFAULT_CONFIGURATION,
148            }
149        }
150        request = (
151            self._service.invocations()
152            .configs()
153            .create(
154                body=configuration,
155                parent=f'invocations/{self._invocation_id}',
156                configId=_DEFAULT_CONFIGURATION,
157                authorizationToken=self._authorization_token,
158            )
159        )
160        res = request.execute(http=self._http)
161        logging.debug('invocations.configs.create: %s', res)
162
163    def create_target(self, target_id: str | None = None) -> str:
164        """Creates a target.
165
166        Args:
167          target_id: An optional custom target ID.
168
169        Returns:
170          The target ID.
171        """
172        logging.debug('creating target in %s...', self._invocation_name)
173        if self._target_id:
174            logging.warning(
175                'target %s already exists, skipping creation...',
176                self._target_id,
177            )
178            return
179        self._target_id = target_id or str(uuid.uuid4())
180        self._encoded_target_id = urllib.parse.quote(self._target_id, safe='')
181        target = {
182            'id': {
183                'invocationId': self._invocation_id,
184                'targetId': self._target_id,
185            },
186            'targetAttributes': {'type': 'TEST', 'language': 'PY'},
187        }
188        request = (
189            self._service.invocations()
190            .targets()
191            .create(
192                body=target,
193                parent=self._invocation_name,
194                targetId=self._target_id,
195                authorizationToken=self._authorization_token,
196            )
197        )
198        res = request.execute(http=self._http)
199        logging.debug('invocations.targets.create: %s', res)
200        return self._target_id
201
202    def create_configured_target(self) -> None:
203        """Creates a configured target."""
204        logging.debug('creating configured target in %s...', self._target_name)
205        configured_target = {
206            'id': {
207                'invocationId': self._invocation_id,
208                'targetId': self._target_id,
209                'configurationId': _DEFAULT_CONFIGURATION,
210            },
211        }
212        request = (
213            self._service.invocations()
214            .targets()
215            .configuredTargets()
216            .create(
217                body=configured_target,
218                parent=self._target_name,
219                configId=_DEFAULT_CONFIGURATION,
220                authorizationToken=self._authorization_token,
221            )
222        )
223        res = request.execute(http=self._http)
224        logging.debug('invocations.targets.configuredTargets.create: %s', res)
225
226    def create_action(self, gcs_path: str, artifacts: list[str]) -> str:
227        """Creates an action.
228
229        Args:
230          gcs_path: The directory in GCS where artifacts are stored.
231          artifacts: List of paths (relative to gcs_path) to the test artifacts.
232
233        Returns:
234          The action ID.
235        """
236        logging.debug('creating action in %s...', self._configured_target_name)
237        action_id = str(uuid.uuid4())
238        files = [
239            {'uid': path, 'uri': posixpath.join(gcs_path, path)}
240            for path in artifacts
241        ]
242        action = {
243            'id': {
244                'invocationId': self._invocation_id,
245                'targetId': self._target_id,
246                'configurationId': _DEFAULT_CONFIGURATION,
247                'actionId': action_id,
248            },
249            'testAction': {},
250            'files': files,
251        }
252        request = (
253            self._service.invocations()
254            .targets()
255            .configuredTargets()
256            .actions()
257            .create(
258                body=action,
259                parent=self._configured_target_name,
260                actionId=action_id,
261                authorizationToken=self._authorization_token,
262            )
263        )
264        res = request.execute(http=self._http)
265        logging.debug(
266            'invocations.targets.configuredTargets.actions.create: %s', res
267        )
268        return action_id
269
270    def merge_configured_target(self):
271        """Merges a configured target."""
272        logging.debug('merging configured target %s...',
273                      self._configured_target_name)
274        merge_request = {
275            'configuredTarget': {
276                'statusAttributes': {'status': self._status.value},
277            },
278            'authorizationToken': self._authorization_token,
279            'updateMask': 'statusAttributes',
280        }
281        request = (
282            self._service.invocations()
283            .targets()
284            .configuredTargets()
285            .merge(
286                body=merge_request,
287                name=self._configured_target_name,
288            )
289        )
290        res = request.execute(http=self._http)
291        logging.debug('invocations.targets.configuredTargets.merge: %s', res)
292
293    def finalize_configured_target(self):
294        """Finalizes a configured target."""
295        logging.debug('finalizing configured target %s...',
296                      self._configured_target_name)
297        finalize_request = {
298            'authorizationToken': self._authorization_token,
299        }
300        request = (
301            self._service.invocations()
302            .targets()
303            .configuredTargets()
304            .finalize(
305                body=finalize_request,
306                name=self._configured_target_name,
307            )
308        )
309        res = request.execute(http=self._http)
310        logging.debug('invocations.targets.configuredTargets.finalize: %s', res)
311
312    def merge_target(self):
313        """Merges a target."""
314        logging.debug('merging target %s...', self._target_name)
315        merge_request = {
316            'target': {
317                'statusAttributes': {'status': self._status.value},
318            },
319            'authorizationToken': self._authorization_token,
320            'updateMask': 'statusAttributes',
321        }
322        request = (
323            self._service.invocations()
324            .targets()
325            .merge(
326                body=merge_request,
327                name=self._target_name,
328            )
329        )
330        res = request.execute(http=self._http)
331        logging.debug('invocations.targets.merge: %s', res)
332
333    def finalize_target(self):
334        """Finalizes a target."""
335        logging.debug('finalizing target %s...', self._target_name)
336        finalize_request = {
337            'authorizationToken': self._authorization_token,
338        }
339        request = (
340            self._service.invocations()
341            .targets()
342            .finalize(
343                body=finalize_request,
344                name=self._target_name,
345            )
346        )
347        res = request.execute(http=self._http)
348        logging.debug('invocations.targets.finalize: %s', res)
349
350    def merge_invocation(self):
351        """Merges an invocation."""
352        logging.debug('merging invocation %s...', self._invocation_name)
353        merge_request = {
354            'invocation': {'statusAttributes': {'status': self._status.value}},
355            'updateMask': 'statusAttributes',
356            'authorizationToken': self._authorization_token,
357        }
358        request = self._service.invocations().merge(body=merge_request,
359                                                    name=self._invocation_name)
360        res = request.execute(http=self._http)
361        logging.debug('invocations.merge: %s', res)
362
363    def finalize_invocation(self):
364        """Finalizes an invocation."""
365        logging.debug('finalizing invocation %s...', self._invocation_name)
366        finalize_request = {
367            'authorizationToken': self._authorization_token,
368        }
369        request = self._service.invocations().finalize(
370            body=finalize_request, name=self._invocation_name
371        )
372        res = request.execute(http=self._http)
373        logging.debug('invocations.finalize: %s', res)
374        print('---------------------')
375        # Make the URL show test cases regardless of status by default.
376        show_statuses = (
377            'showStatuses='
378            f'{",".join(str(status_code) for status_code in StatusCode)}'
379        )
380        print(
381            f'See results in {_RESULTSTORE_BASE_LINK}/'
382            f'{self._target_name};config={_DEFAULT_CONFIGURATION}/tests;'
383            f'{show_statuses}'
384        )
385        self._request_id = ''
386        self._invocation_id = ''
387        self._authorization_token = ''
388        self._target_id = ''
389        self._encoded_target_id = ''
390