1#!/usr/bin/env python3
2# Copyright 2019, The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Run at preupload hook to perform necessary checks and formatting."""
17
18import argparse
19import concurrent.futures
20import multiprocessing
21import pathlib
22import shlex
23import subprocess
24import sys
25
26ASUITE_HOME = pathlib.Path(__file__).resolve().parent
27
28
29def _filter_python_files(files: list[pathlib.Path]) -> list[pathlib.Path]:
30  """Filter a list of files and return a new list of python files only."""
31  return [file for file in files if file.suffix == '.py']
32
33
34def _check_run_shell_command(cmd: str, cwd: str = None) -> None:
35  """Run a shell command and raise error if failed."""
36  if subprocess.run(shlex.split(cmd), cwd=cwd, check=False).returncode:
37    print('Preupload files did not pass Asuite preupload hook script.')
38    sys.exit(1)
39
40
41def _run_python_lint(lint_bin: str, files: list[pathlib.Path]) -> None:
42  """Run python lint binary on python files."""
43  run_lint_on_file = lambda file: subprocess.run(
44      shlex.split(f'{lint_bin} {file.as_posix()}'),
45      check=False,
46      capture_output=True,
47  )
48
49  cpu_count = multiprocessing.cpu_count()
50  with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor:
51    completed_processes = executor.map(
52        run_lint_on_file, _filter_python_files(files)
53    )
54
55  has_format_issue = False
56  for process in completed_processes:
57    if not process.returncode:
58      continue
59    print(process.stdout.decode())
60    has_format_issue = True
61
62  if has_format_issue:
63    sys.exit(1)
64
65
66def _run_pylint(files: list[pathlib.Path]) -> None:
67  """Run pylint on python files."""
68  _run_python_lint('pylint', files)
69
70
71def _run_gpylint(files: list[pathlib.Path]) -> None:
72  """Run gpylint on python files if gpylint is available."""
73  if subprocess.run(
74      shlex.split('which gpylint'),
75      check=False,
76  ).returncode:
77    print('gpylint not available. Will use pylint instead.')
78    _run_pylint(files)
79    return
80
81  _run_python_lint('gpylint', files)
82
83
84def _run_pyformat(files: list[pathlib.Path]) -> None:
85  """Run pyformat on certain projects."""
86  if subprocess.run(
87      shlex.split('which pyformat'),
88      check=False,
89  ).returncode:
90    print('pyformat not available. Will skip auto formatting.')
91    return
92
93  def _run_pyformat_on_file(file):
94    completed_process = subprocess.run(
95        shlex.split('pyformat --force_quote_type single ' + file.as_posix()),
96        capture_output=True,
97        check=False,
98    )
99
100    if completed_process.stdout:
101      subprocess.run(
102          shlex.split(
103              'pyformat -i --force_quote_type single ' + file.as_posix()
104          ),
105          check=False,
106      )
107      return True
108    return False
109
110  cpu_count = multiprocessing.cpu_count()
111  with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor:
112    need_reformat = executor.map(
113        _run_pyformat_on_file, _filter_python_files(files)
114    )
115
116  if any(need_reformat):
117    print(
118        'Reformatting completed. Please add the modified files to git and rerun'
119        ' the repo preupload hook.'
120    )
121    sys.exit(1)
122
123
124def _run_legacy_unittests() -> None:
125  """Run unittests for asuite_plugin."""
126  asuite_plugin_path = ASUITE_HOME.joinpath('asuite_plugin').as_posix()
127  _check_run_shell_command(
128      f'{asuite_plugin_path}/gradlew test', asuite_plugin_path
129  )
130
131
132def _filter_files_for_projects(
133    files: list[pathlib.Path], projects: list[str], root_files: bool
134) -> tuple[list[pathlib.Path], list[pathlib.Path]]:
135  """Filter a list of files according to project names.
136
137  Args:
138      files: list of files to filter.
139      projects: list of project names to match, e.g. ['atest'].
140      root_files: whether to treat files under the asuite root directory as
141        matched files.
142
143  Returns:
144      A tuple of a list of files matching the projects and a list of files not
145      matching the projects.
146  """
147  matched_files = []
148  not_matched_files = []
149  project_paths = [
150      ASUITE_HOME.joinpath(project).resolve().as_posix() for project in projects
151  ]
152  for file in files:
153    if file.as_posix().startswith(tuple(project_paths)):
154      matched_files.append(file)
155    elif root_files and file.parent == ASUITE_HOME:
156      matched_files.append(file)
157    else:
158      not_matched_files.append(file)
159
160  return matched_files, not_matched_files
161
162
163def get_preupload_files() -> list[pathlib.Path]:
164  """Get the list of files to be uploaded."""
165  parser = argparse.ArgumentParser()
166  parser.add_argument('preupload_files', nargs='*', help='Files to upload.')
167  args = parser.parse_args()
168  files_to_upload = args.preupload_files
169  if not files_to_upload:
170    # When running by users directly, only consider:
171    # added(A), renamed(R) and modified(M) files
172    # and store them in files_to_upload.
173    cmd = "git status --short | egrep [ARM] | awk '{print $NF}'"
174    files_to_upload = subprocess.check_output(
175        cmd, shell=True, encoding='utf-8'
176    ).splitlines()
177    if files_to_upload:
178      print('Modified files: %s' % files_to_upload)
179  file_paths_to_upload = [
180      pathlib.Path(file).resolve() for file in files_to_upload
181  ]
182  return [file for file in file_paths_to_upload if file.exists()]
183
184
185if __name__ == '__main__':
186  preupload_files = get_preupload_files()
187
188  gpylint_project_files, other_files = _filter_files_for_projects(
189      preupload_files, ['atest'], root_files=True
190  )
191  _run_pylint(other_files)
192  _run_pyformat(gpylint_project_files)
193  _run_gpylint(gpylint_project_files)
194
195  asuite_plugin_files, _ = _filter_files_for_projects(
196      preupload_files, ['asuite_plugin'], root_files=False
197  )
198  if asuite_plugin_files:
199    _run_legacy_unittests()
200