1# Copyright 2016 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Script to run sensor_fusion repeated times.""" 15 16import logging 17import os 18import os.path 19import shutil 20import subprocess 21import sys 22import tempfile 23 24import numpy as np 25 26import run_all_tests # from same tools directory as run_sensor_fusion.py 27 28_CORR_DIST_THRESH_MAX = 0.005 # must match value in test_sensor_fusion.py 29_NUM_RUNS = 1 30_TEST_BED_SENSOR_FUSION = 'TEST_BED_SENSOR_FUSION' 31_TIME_SHIFT_MATCH = 'Best correlation of ' 32 33 34def find_time_shift(out_file_path): 35 """Search through a test run's test_log.DEBUG for the best time shift. 36 37 Args: 38 out_file_path: File path for stdout logs to search through 39 40 Returns: 41 Float num of best time shift, if one is found. Otherwise, None. 42 """ 43 line = find_matching_line(out_file_path, _TIME_SHIFT_MATCH) 44 if line is None: 45 return None 46 else: 47 words = line.split(' ') 48 time_shift = float(words[-1][:-3]) # strip off 'ms' 49 fit_corr = float(words[-5]) 50 return {'time_shift': time_shift, 'corr': fit_corr} 51 52 53def find_matching_line(file_path, match_string): 54 """Search each line in the file at 'file_path' for match_string. 55 56 Args: 57 file_path: File path for file being searched 58 match_string: Sting used to match against lines 59 60 Returns: 61 The first matching line. If none exists, returns None. 62 """ 63 with open(file_path) as f: 64 for line in f: 65 if match_string in line: 66 return line 67 return None 68 69 70def main(): 71 """Run the sensor_fusion test for stastical purposes. 72 73 Script should be run from the top-level CameraITS directory. 74 All parameters expect 'num_runs' are defined in config.yml. 75 num_runs is defined at run time with 'num_runs=<int>' 76 'camera_id' can be over-written at command line to allow different 77 camera_ids facing the same direction to be tested. 78 79 ie. python tools/run_all_tests.py num_runs=10 # n=10 w/ config.yml cam 80 python tools/run_all_tests.py camera=0 num_runs=10 # n=10 w/ cam[0] 81 python tools/run_all_tests.py camera=0.4 num_runs=10 # n=10 w/ cam[0.4] 82 83 Command line arguments: 84 camera_id: camera_id or list of camera_ids. 85 num_runs: integer number of runs to get statistical values 86 87 All other config values are stored in config.yml file. 88 """ 89 logging.basicConfig(level=logging.INFO) 90 # Make output directories to hold the generated files. 91 topdir = tempfile.mkdtemp(prefix='CameraITS_') 92 subprocess.call(['chmod', 'g+rx', topdir]) 93 94 camera_id_combos = [] 95 96 # Override camera with cmd line values if available 97 num_runs = _NUM_RUNS 98 get_argv_vals = lambda x: x.split('=')[1] 99 for s in list(sys.argv[1:]): 100 if 'camera=' in s: 101 camera_id_combos = str(get_argv_vals(s)).split(',') 102 elif 'num_runs=' in s: 103 num_runs = int(get_argv_vals(s)) 104 105 # Read config file and extract relevant TestBed 106 config_file_contents = run_all_tests.get_config_file_contents() 107 for i in config_file_contents['TestBeds']: 108 if i['Name'] != _TEST_BED_SENSOR_FUSION: 109 config_file_contents['TestBeds'].remove(i) 110 111 # Get test parameters from config file 112 test_params_content = run_all_tests.get_test_params(config_file_contents) 113 if not camera_id_combos: 114 camera_id_combos = test_params_content['camera'].split(',') 115 debug = test_params_content['debug_mode'] 116 fps = test_params_content['fps'] 117 img_size = test_params_content['img_size'] 118 119 # Get dut id 120 device_id = run_all_tests.get_device_serial_number( 121 'dut', config_file_contents) 122 123 # Log run info 124 logging.info('Running sensor_fusion on device: %s, camera: %s', 125 device_id, camera_id_combos) 126 logging.info('Saving output files to: %s', topdir) 127 128 for camera_id in camera_id_combos: 129 time_shifts = [] 130 # A subdir in topdir will be created for each camera_id. 131 test_params_content['camera'] = camera_id 132 test_params_content['scene'] = 'sensor_fusion' 133 config_file_contents['TestBeds'][0]['TestParams'] = test_params_content 134 os.mkdir(os.path.join(topdir, camera_id)) 135 136 # Add the MoblyParams to config.yml file store camera_id test results. 137 mobly_output_logs_path = os.path.join(topdir, camera_id) 138 mobly_scene_output_logs_path = os.path.join( 139 mobly_output_logs_path, 'sensor_fusion') 140 mobly_params_dict = { 141 'MoblyParams': { 142 'LogPath': mobly_scene_output_logs_path 143 } 144 } 145 config_file_contents.update(mobly_params_dict) 146 logging.debug('Config file contents: %s', config_file_contents) 147 tmp_yml_file_name = run_all_tests.get_updated_yml_file(config_file_contents) 148 logging.info('Using %s as temporary config yml file', tmp_yml_file_name) 149 150 # Run tests 151 logging.info('%d runs for test_sensor_fusion with camera %s', 152 num_runs, camera_id) 153 logging.info('FPS: %d, img size: %s', fps, img_size) 154 for _ in range(num_runs): 155 cmd = ['python', 156 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', 157 'sensor_fusion', 'test_sensor_fusion.py'), 158 '-c', 159 f'{tmp_yml_file_name}' 160 ] 161 # pylint: disable=subprocess-run-check 162 with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'w') as fp: 163 output = subprocess.run(cmd, stdout=fp) 164 # pylint: enable=subprocess-run-check 165 166 with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as _: 167 if output.returncode == 0: 168 return_string = 'PASS' 169 else: 170 return_string = 'FAIL' 171 172 with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as file: 173 content = file.read() 174 lines = content.splitlines() 175 for one_line in lines: 176 if 'root_output_path:' in one_line: 177 root_output_path = one_line.split(':')[1].strip() 178 file.close() 179 180 os.remove(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE) 181 file_name = os.path.join(root_output_path, 'test_log.DEBUG') 182 time_shift = find_time_shift(file_name) 183 if time_shift is not None: 184 logging.info('%s time_shift: %.4f ms, corr: %.6f', return_string, 185 time_shift['time_shift'], time_shift['corr']) 186 if time_shift['corr'] < _CORR_DIST_THRESH_MAX: 187 time_shifts.append(time_shift) 188 else: 189 logging.info('Correlation distance too large. Not used for stats.') 190 else: 191 logging.info('time_shift not found') 192 193 # Summarize results with stats 194 times = [t['time_shift'] for t in time_shifts] 195 logging.info('runs: %d, time_shift mean: %.4f, sigma: %.4f', 196 len(times), np.mean(times), np.std(times)) 197 198 # Delete temporary yml file after run. 199 tmp_yml_file = os.path.join(run_all_tests.YAML_FILE_DIR, tmp_yml_file_name) 200 os.remove(tmp_yml_file) 201 202 # Delete temporary image files after run. 203 if debug == 'False': 204 logging.info('Removing tmp dir %s to save space.', topdir) 205 shutil.rmtree(topdir) 206 207 logging.info('Test completed.') 208if __name__ == '__main__': 209 main() 210 211