1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""A module for background python log artifacts uploading."""
17
18import argparse
19import functools
20from importlib import resources
21import logging
22import multiprocessing
23import os
24import pathlib
25import subprocess
26import sys
27from typing import Callable
28from atest import constants
29from atest.logstorage import logstorage_utils
30from atest.metrics import metrics
31from googleapiclient import errors
32from googleapiclient import http
33
34
35_ENABLE_ATEST_LOG_UPLOADING_ENV_KEY = 'ENABLE_ATEST_LOG_UPLOADING'
36
37
38class _SimpleUploadingClient:
39  """A proxy class used to interact with the logstorage_utils module."""
40
41  def __init__(self, atest_run_id: str):
42    self._atest_run_id = atest_run_id
43    self._client = None
44    self._client_legacy = None
45    self._invocation_id = None
46    self._workunit_id = None
47    self._legacy_test_result_id = None
48    self._invocation_data = None
49
50  def initialize_invocation(self):
51    """Initialize internal build clients and get invocation ID from AnTS."""
52    configuration = {}
53    creds, self._invocation_data = logstorage_utils.do_upload_flow(
54        configuration, self._atest_run_id
55    )
56
57    self._client = logstorage_utils.BuildClient(creds)
58    # Legacy test result ID is required when using AnTS' `testartifact` API
59    # to upload test artifacts due to a limitation in the API, and we need
60    # The legacy client to get the legacy ID.
61    self._client_legacy = logstorage_utils.BuildClient(
62        creds,
63        api_version=constants.STORAGE_API_VERSION_LEGACY,
64        url=constants.DISCOVERY_SERVICE_LEGACY,
65    )
66
67    self._invocation_id = configuration[constants.INVOCATION_ID]
68    self._workunit_id = configuration[constants.WORKUNIT_ID]
69
70    self._legacy_test_result_id = (
71        self._client_legacy.client.testresult()
72        .insert(
73            buildId=self._invocation_data['primaryBuild']['buildId'],
74            target=self._invocation_data['primaryBuild']['buildTarget'],
75            attemptId='latest',
76            body={
77                'status': 'completePass',
78            },
79        )
80        .execute()['id']
81    )
82
83    logging.debug(
84        'Initialized AnTS invocation: http://ab/%s', self._invocation_id
85    )
86
87  def complete_invocation(self) -> None:
88    """Set schedule state as complete to AnTS for the current invocation."""
89    self._invocation_data['schedulerState'] = 'completed'
90    self._client.update_invocation(self._invocation_data)
91    logging.debug(
92        'Finalized AnTS invocation: http://ab/%s', self._invocation_id
93    )
94
95  def upload_artifact(
96      self,
97      resource_id: str,
98      metadata: dict[str, str],
99      artifact_path: pathlib.Path,
100      num_of_retries,
101  ) -> None:
102    """Upload an artifact to AnTS with retries.
103
104    Args:
105        resource_id: The artifact's destination resource ID
106        metadata: The metadata for the artifact. Invocation ID and work unit ID
107          is not required in the input metadata dict as this method will add the
108          values to it.
109        artifact_path: The path of the artifact file
110        num_of_retries: Number of retries when the upload request failed
111
112    Raises:
113        errors.HttpError: When the upload failed.
114    """
115    metadata['invocationId'] = self._invocation_id
116    metadata['workUnitId'] = self._workunit_id
117
118    self._client.client.testartifact().update(
119        resourceId=resource_id,
120        invocationId=self._invocation_id,
121        workUnitId=self._workunit_id,
122        body=metadata,
123        legacyTestResultId=self._legacy_test_result_id,
124        media_body=http.MediaFileUpload(artifact_path),
125    ).execute(num_retries=num_of_retries)
126
127
128class _LogUploadSession:
129  """A class to handle log uploading to AnTS."""
130
131  def __init__(
132      self, atest_run_id: str, upload_client: _SimpleUploadingClient = None
133  ):
134    self._upload_client = upload_client or _SimpleUploadingClient(atest_run_id)
135    self._resource_ids = {}
136
137  def __enter__(self):
138    self._upload_client.initialize_invocation()
139    return self
140
141  def __exit__(self, exc_type, exc_val, exc_tb):
142    self._upload_client.complete_invocation()
143
144  @classmethod
145  def _get_file_paths(cls, directory: pathlib.Path) -> list[pathlib.Path]:
146    """Returns all the files under the given directory following symbolic links.
147
148    Args:
149        directory: The root directory path.
150
151    Returns:
152        A list of pathlib.Path objects representing the file paths.
153    """
154
155    file_paths = []
156    with os.scandir(directory) as scan:
157      for entry in scan:
158        if entry.is_file():
159          file_paths.append(pathlib.Path(entry.path))
160        elif entry.is_dir():
161          file_paths.extend(cls._get_file_paths(entry))
162
163    return file_paths
164
165  @staticmethod
166  def _create_artifact_metadata(artifact_path: pathlib.Path) -> dict[str, str]:
167    metadata = {
168        'name': artifact_path.name,
169    }
170    if artifact_path.suffix in ['.txt', '.log']:
171      metadata['artifactType'] = 'HOST_LOG'
172      metadata['contentType'] = 'text/plain'
173    return metadata
174
175  def upload_directory(self, artifacts_dir: pathlib.Path) -> None:
176    """Upload all artifacts under a directory."""
177    logging.debug('Uploading artifact directory %s', artifacts_dir)
178    for artifact_path in self._get_file_paths(artifacts_dir):
179      self.upload_single_file(artifact_path)
180
181  def upload_single_file(self, artifact_path: pathlib.Path) -> None:
182    """Upload an single artifact."""
183    logging.debug('Uploading artifact path %s', artifact_path)
184    file_upload_retires = 3
185    try:
186      self._upload_client.upload_artifact(
187          self._create_resource_id(artifact_path),
188          self._create_artifact_metadata(artifact_path),
189          artifact_path,
190          file_upload_retires,
191      )
192    except errors.HttpError as e:
193      # Upload error may happen due to temporary network issue. We log down
194      # an error but do stop the upload loop so that other files may gets
195      # uploaded when the network recover.
196      logging.error('Failed to upload file %s with error: %s', artifact_path, e)
197
198  def _create_resource_id(self, artifact_path: pathlib.Path) -> str:
199    """Create a unique resource id for a file.
200
201    Args:
202        artifact_path: artifact file path
203
204    Returns:
205        A unique resource ID derived from the file name. If the file name
206        has appeared before, an extra string will be inserted between the file
207        name stem and suffix to make it unique.
208    """
209    count = self._resource_ids.get(artifact_path.name, 0) + 1
210    self._resource_ids[artifact_path.name] = count
211    return (
212        artifact_path.name
213        if count == 1
214        else f'{artifact_path.stem}_{count}{artifact_path.suffix}'
215    )
216
217
218@functools.cache
219def is_uploading_logs(gcert_checker: Callable[[], bool] = None) -> bool:
220  """Determines whether log uploading is happening in the current run."""
221  if os.environ.get(_ENABLE_ATEST_LOG_UPLOADING_ENV_KEY, 'true').lower() in [
222      'false',
223      '0',
224  ]:
225    return False
226
227  if not logstorage_utils.is_credential_available():
228    return False
229
230  # Checks whether gcert is available and not about to expire.
231  if gcert_checker is None:
232    gcert_checker = (
233        lambda: subprocess.run(
234            ['which', 'gcertstatus'],
235            capture_output=True,
236            check=False,
237        ).returncode
238        == 0
239        and subprocess.run(
240            ['gcertstatus', '--check_remaining=6m'],
241            capture_output=True,
242            check=False,
243        ).returncode
244        == 0
245    )
246  return gcert_checker()
247
248
249def upload_logs_detached(logs_dir: pathlib.Path):
250  """Upload logs to AnTS in a detached process."""
251  if not is_uploading_logs():
252    return
253
254  assert logs_dir, 'artifacts_dir cannot be None.'
255  assert logs_dir.as_posix(), 'The path of artifacts_dir should not be empty.'
256
257  def _start_upload_process():
258    # We need to fock a background process instead of calling Popen with
259    # start_new_session=True because we want to make sure the atest_log_uploader
260    # resource binary is deleted after execution.
261    if os.fork() != 0:
262      return
263    with resources.as_file(
264        resources.files('atest').joinpath('atest_log_uploader')
265    ) as uploader_path:
266      # TODO: Explore whether it's possible to package the binary with
267      # executable permission.
268      os.chmod(uploader_path, 0o755)
269
270      timeout = 60 * 60 * 24  # 1 day
271      # We need to call atest_log_uploader as a binary so that the python
272      # environment can be properly loaded.
273      process = subprocess.run(
274          [uploader_path.as_posix(), logs_dir.as_posix(), metrics.get_run_id()],
275          timeout=timeout,
276          capture_output=True,
277          check=False,
278      )
279      if process.returncode:
280        logging.error('Failed to run log upload process: %s', process)
281
282  proc = multiprocessing.Process(target=_start_upload_process)
283  proc.start()
284  proc.join()
285
286
287def _configure_logging(log_dir: str) -> None:
288  """Configure the logger."""
289  log_fmat = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s'
290  date_fmt = '%Y-%m-%d %H:%M:%S'
291  log_path = os.path.join(log_dir, 'atest_log_uploader.log')
292  logging.getLogger('').handlers = []
293  logging.basicConfig(
294      filename=log_path, level=logging.DEBUG, format=log_fmat, datefmt=date_fmt
295  )
296
297
298def _redirect_stdout_stderr() -> None:
299  """Redirect stdout and stderr to logger."""
300
301  class _StreamToLogger:
302
303    def __init__(self, logger, log_level=logging.INFO):
304      self._logger = logger
305      self._log_level = log_level
306
307    def write(self, buf):
308      self._logger.log(self._log_level, buf)
309
310    def flush(self):
311      pass
312
313  logger = logging.getLogger('')
314  sys.stdout = _StreamToLogger(logger, logging.INFO)
315  sys.stderr = _StreamToLogger(logger, logging.ERROR)
316
317
318def _main() -> None:
319  """The main method to be executed when executing this module as a binary."""
320  arg_parser = argparse.ArgumentParser(
321      description='Internal tool for uploading test artifacts to AnTS.',
322      add_help=True,
323  )
324  arg_parser.add_argument(
325      'artifacts_dir', help='Root directory of the test artifacts.'
326  )
327  arg_parser.add_argument('atest_run_id', help='The Atest run ID.')
328  args = arg_parser.parse_args()
329  _configure_logging(args.artifacts_dir)
330  _redirect_stdout_stderr()
331
332  with _LogUploadSession(args.atest_run_id) as artifact_upload_session:
333    artifact_upload_session.upload_directory(pathlib.Path(args.artifacts_dir))
334
335
336if __name__ == '__main__':
337  _main()
338