1#!/usr/bin/env python3 2# Copyright 2019, The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Run at preupload hook to perform necessary checks and formatting.""" 17 18import argparse 19import concurrent.futures 20import multiprocessing 21import pathlib 22import shlex 23import subprocess 24import sys 25 26ASUITE_HOME = pathlib.Path(__file__).resolve().parent 27 28 29def _filter_python_files(files: list[pathlib.Path]) -> list[pathlib.Path]: 30 """Filter a list of files and return a new list of python files only.""" 31 return [file for file in files if file.suffix == '.py'] 32 33 34def _check_run_shell_command(cmd: str, cwd: str = None) -> None: 35 """Run a shell command and raise error if failed.""" 36 if subprocess.run(shlex.split(cmd), cwd=cwd, check=False).returncode: 37 print('Preupload files did not pass Asuite preupload hook script.') 38 sys.exit(1) 39 40 41def _run_python_lint(lint_bin: str, files: list[pathlib.Path]) -> None: 42 """Run python lint binary on python files.""" 43 run_lint_on_file = lambda file: subprocess.run( 44 shlex.split(f'{lint_bin} {file.as_posix()}'), 45 check=False, 46 capture_output=True, 47 ) 48 49 cpu_count = multiprocessing.cpu_count() 50 with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor: 51 completed_processes = executor.map( 52 run_lint_on_file, _filter_python_files(files) 53 ) 54 55 has_format_issue = False 56 for process in completed_processes: 57 if not process.returncode: 58 continue 59 print(process.stdout.decode()) 60 has_format_issue = True 61 62 if has_format_issue: 63 sys.exit(1) 64 65 66def _run_pylint(files: list[pathlib.Path]) -> None: 67 """Run pylint on python files.""" 68 _run_python_lint('pylint', files) 69 70 71def _run_gpylint(files: list[pathlib.Path]) -> None: 72 """Run gpylint on python files if gpylint is available.""" 73 if subprocess.run( 74 shlex.split('which gpylint'), 75 check=False, 76 ).returncode: 77 print('gpylint not available. Will use pylint instead.') 78 _run_pylint(files) 79 return 80 81 _run_python_lint('gpylint', files) 82 83 84def _run_pyformat(files: list[pathlib.Path]) -> None: 85 """Run pyformat on certain projects.""" 86 if subprocess.run( 87 shlex.split('which pyformat'), 88 check=False, 89 ).returncode: 90 print('pyformat not available. Will skip auto formatting.') 91 return 92 93 def _run_pyformat_on_file(file): 94 completed_process = subprocess.run( 95 shlex.split('pyformat --force_quote_type single ' + file.as_posix()), 96 capture_output=True, 97 check=False, 98 ) 99 100 if completed_process.stdout: 101 subprocess.run( 102 shlex.split( 103 'pyformat -i --force_quote_type single ' + file.as_posix() 104 ), 105 check=False, 106 ) 107 return True 108 return False 109 110 cpu_count = multiprocessing.cpu_count() 111 with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor: 112 need_reformat = executor.map( 113 _run_pyformat_on_file, _filter_python_files(files) 114 ) 115 116 if any(need_reformat): 117 print( 118 'Reformatting completed. Please add the modified files to git and rerun' 119 ' the repo preupload hook.' 120 ) 121 sys.exit(1) 122 123 124def _run_legacy_unittests() -> None: 125 """Run unittests for asuite_plugin.""" 126 asuite_plugin_path = ASUITE_HOME.joinpath('asuite_plugin').as_posix() 127 _check_run_shell_command( 128 f'{asuite_plugin_path}/gradlew test', asuite_plugin_path 129 ) 130 131 132def _filter_files_for_projects( 133 files: list[pathlib.Path], projects: list[str], root_files: bool 134) -> tuple[list[pathlib.Path], list[pathlib.Path]]: 135 """Filter a list of files according to project names. 136 137 Args: 138 files: list of files to filter. 139 projects: list of project names to match, e.g. ['atest']. 140 root_files: whether to treat files under the asuite root directory as 141 matched files. 142 143 Returns: 144 A tuple of a list of files matching the projects and a list of files not 145 matching the projects. 146 """ 147 matched_files = [] 148 not_matched_files = [] 149 project_paths = [ 150 ASUITE_HOME.joinpath(project).resolve().as_posix() for project in projects 151 ] 152 for file in files: 153 if file.as_posix().startswith(tuple(project_paths)): 154 matched_files.append(file) 155 elif root_files and file.parent == ASUITE_HOME: 156 matched_files.append(file) 157 else: 158 not_matched_files.append(file) 159 160 return matched_files, not_matched_files 161 162 163def get_preupload_files() -> list[pathlib.Path]: 164 """Get the list of files to be uploaded.""" 165 parser = argparse.ArgumentParser() 166 parser.add_argument('preupload_files', nargs='*', help='Files to upload.') 167 args = parser.parse_args() 168 files_to_upload = args.preupload_files 169 if not files_to_upload: 170 # When running by users directly, only consider: 171 # added(A), renamed(R) and modified(M) files 172 # and store them in files_to_upload. 173 cmd = "git status --short | egrep [ARM] | awk '{print $NF}'" 174 files_to_upload = subprocess.check_output( 175 cmd, shell=True, encoding='utf-8' 176 ).splitlines() 177 if files_to_upload: 178 print('Modified files: %s' % files_to_upload) 179 file_paths_to_upload = [ 180 pathlib.Path(file).resolve() for file in files_to_upload 181 ] 182 return [file for file in file_paths_to_upload if file.exists()] 183 184 185if __name__ == '__main__': 186 preupload_files = get_preupload_files() 187 188 gpylint_project_files, other_files = _filter_files_for_projects( 189 preupload_files, ['atest'], root_files=True 190 ) 191 _run_pylint(other_files) 192 _run_pyformat(gpylint_project_files) 193 _run_gpylint(gpylint_project_files) 194 195 asuite_plugin_files, _ = _filter_files_for_projects( 196 preupload_files, ['asuite_plugin'], root_files=False 197 ) 198 if asuite_plugin_files: 199 _run_legacy_unittests() 200