1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Utility functions for logstorage."""
17from __future__ import print_function
18
19import logging
20import time
21import uuid
22
23from atest import atest_utils
24from atest import constants
25from atest.logstorage import atest_gcp_utils
26from atest.metrics import metrics_base
27from googleapiclient.discovery import build
28import httplib2
29from oauth2client import client as oauth2_client
30
31UPLOAD_REQUESTED_FILE_NAME = 'UPLOAD_REQUESTED'
32
33
34def is_credential_available() -> bool:
35  """Checks whether the credential needed for log upload is available."""
36  return constants.CREDENTIAL_FILE_NAME and constants.TOKEN_FILE_PATH
37
38
39def is_upload_enabled(args: dict[str, str]) -> bool:
40  """Determines whether log upload is enabled."""
41  if not is_credential_available() or not constants.GTF_TARGETS:
42    return False
43
44  config_folder_path = atest_utils.get_config_folder()
45  config_folder_path.mkdir(parents=True, exist_ok=True)
46  upload_requested_file = config_folder_path.joinpath(
47      UPLOAD_REQUESTED_FILE_NAME
48  )
49
50  is_request_upload = args.get(constants.REQUEST_UPLOAD_RESULT)
51  is_disable_upload = args.get(constants.DISABLE_UPLOAD_RESULT)
52  is_previously_requested = upload_requested_file.exists()
53
54  # Note: is_request_upload and is_disable_upload are from mutually exclusive
55  # args so they won't be True simutaniously.
56  if not is_disable_upload and is_previously_requested:  # Previously enabled
57    atest_utils.colorful_print(
58        'AnTS result uploading is enabled. (To disable, use'
59        ' --disable-upload-result flag)',
60        constants.GREEN,
61    )
62    return True
63
64  if is_request_upload and not is_previously_requested:  # First time enable
65    atest_utils.colorful_print(
66        'AnTS result uploading is switched on and will apply to the current and'
67        ' future TradeFed test runs. To disable it, run a test with the'
68        ' --disable-upload-result flag.',
69        constants.GREEN,
70    )
71    upload_requested_file.touch()
72    return True
73
74  if is_disable_upload and is_previously_requested:  # First time disable
75    atest_utils.colorful_print(
76        'AnTS result uploading is switched off and will apply to the current'
77        ' and future TradeFed test runs. To re-enable it, run a test with the'
78        ' --request-upload-result flag.',
79        constants.GREEN,
80    )
81    upload_requested_file.unlink()
82    config_folder_path.joinpath(constants.CREDENTIAL_FILE_NAME).unlink(
83        missing_ok=True
84    )
85    return False
86
87  return False
88
89
90def do_upload_flow(
91    extra_args: dict[str, str], atest_run_id: str = None
92) -> tuple:
93  """Run upload flow.
94
95  Asking user's decision and do the related steps.
96
97  Args:
98      extra_args: Dict of extra args to add to test run.
99      atest_run_id: The atest run ID to write into the invocation.
100
101  Return:
102      A tuple of credential object and invocation information dict.
103  """
104  return atest_gcp_utils.do_upload_flow(
105      extra_args, lambda cred: BuildClient(cred), atest_run_id
106  )
107
108
109class BuildClient:
110  """Build api helper class."""
111
112  def __init__(
113      self,
114      creds,
115      api_version=constants.STORAGE_API_VERSION,
116      url=constants.DISCOVERY_SERVICE,
117  ):
118    """Init BuildClient class.
119
120    Args:
121        creds: An oauth2client.OAuth2Credentials instance.
122    """
123    http_auth = creds.authorize(httplib2.Http())
124    self.client = build(
125        serviceName=constants.STORAGE_SERVICE_NAME,
126        version=api_version,
127        cache_discovery=False,
128        http=http_auth,
129        discoveryServiceUrl=url,
130    )
131
132  def list_branch(self):
133    """List all branch."""
134    return self.client.branch().list(maxResults=10000).execute()
135
136  def list_target(self, branch):
137    """List all target in the branch."""
138    return self.client.target().list(branch=branch, maxResults=10000).execute()
139
140  def get_branch(self, branch):
141    """Get BuildInfo for specific branch.
142
143    Args:
144        branch: A string of branch name to query.
145    """
146    query_branch = ''
147    try:
148      query_branch = self.client.branch().get(resourceId=branch).execute()
149    # pylint: disable=broad-except
150    except Exception:
151      return ''
152    return query_branch
153
154  def insert_local_build(self, external_id, target, branch):
155    """Insert a build record.
156
157    Args:
158        external_id: unique id of build record.
159        target: build target.
160        branch: build branch.
161
162    Returns:
163        A build record object.
164    """
165    body = {
166        'buildId': '',
167        'externalId': external_id,
168        'branch': branch,
169        'target': {'name': target, 'target': target},
170        'buildAttemptStatus': 'complete',
171    }
172    return self.client.build().insert(buildType='local', body=body).execute()
173
174  def insert_build_attempts(self, build_record):
175    """Insert a build attempt record.
176
177    Args:
178        build_record: build record.
179
180    Returns:
181        A build attempt object.
182    """
183    build_attempt = {'id': 0, 'status': 'complete', 'successful': True}
184    return (
185        self.client.buildattempt()
186        .insert(
187            buildId=build_record['buildId'],
188            target=build_record['target']['name'],
189            body=build_attempt,
190        )
191        .execute()
192    )
193
194  def insert_invocation(self, build_record, atest_run_id: str):
195    """Insert a build invocation record.
196
197    Args:
198        build_record: build record.
199        atest_run_id: The atest run ID to write into the invocation.
200
201    Returns:
202        A build invocation object.
203    """
204    sponge_invocation_id = str(uuid.uuid4())
205    user_email = metrics_base.get_user_email()
206    invocation = {
207        'primaryBuild': {
208            'buildId': build_record['buildId'],
209            'buildTarget': build_record['target']['name'],
210            'branch': build_record['branch'],
211        },
212        'schedulerState': 'running',
213        'runner': 'atest',
214        'scheduler': 'atest',
215        'users': [user_email],
216        'properties': [
217            {
218                'name': 'sponge_invocation_id',
219                'value': sponge_invocation_id,
220            },
221            {
222                'name': 'test_uri',
223                'value': f'{constants.STORAGE2_TEST_URI}{sponge_invocation_id}',
224            },
225            {'name': 'atest_run_id', 'value': atest_run_id},
226        ],
227    }
228    return self.client.invocation().insert(body=invocation).execute()
229
230  def update_invocation(self, invocation):
231    """Insert a build invocation record.
232
233    Args:
234        invocation: invocation record.
235
236    Returns:
237        A invocation object.
238    """
239    # Because invocation revision will be update by TF, we need to fetch
240    # latest invocation revision to update status correctly.
241    count = 0
242    invocations = None
243    while count < 5:
244      invocations = (
245          self.client.invocation()
246          .list(invocationId=invocation['invocationId'], maxResults=10)
247          .execute()
248          .get('invocations', [])
249      )
250      if invocations:
251        break
252      time.sleep(0.5)
253      count = count + 1
254    if invocations:
255      latest_revision = invocations[-1].get('revision', '')
256      if latest_revision:
257        logging.debug(
258            'Get latest_revision:%s from invocations:%s',
259            latest_revision,
260            invocations,
261        )
262        invocation['revision'] = latest_revision
263    return (
264        self.client.invocation()
265        .update(resourceId=invocation['invocationId'], body=invocation)
266        .execute()
267    )
268
269  def insert_work_unit(self, invocation_record):
270    """Insert a workunit record.
271
272    Args:
273        invocation_record: invocation record.
274
275    Returns:
276        the workunit object.
277    """
278    workunit = {'invocationId': invocation_record['invocationId']}
279    return self.client.workunit().insert(body=workunit).execute()
280