1# Copyright 2016 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Script to run sensor_fusion repeated times."""
15
16import logging
17import os
18import os.path
19import shutil
20import subprocess
21import sys
22import tempfile
23
24import numpy as np
25
26import run_all_tests  # from same tools directory as run_sensor_fusion.py
27
28_CORR_DIST_THRESH_MAX = 0.005  # must match value in test_sensor_fusion.py
29_NUM_RUNS = 1
30_TEST_BED_SENSOR_FUSION = 'TEST_BED_SENSOR_FUSION'
31_TIME_SHIFT_MATCH = 'Best correlation of '
32
33
34def find_time_shift(out_file_path):
35  """Search through a test run's test_log.DEBUG for the best time shift.
36
37  Args:
38    out_file_path: File path for stdout logs to search through
39
40  Returns:
41    Float num of best time shift, if one is found. Otherwise, None.
42  """
43  line = find_matching_line(out_file_path, _TIME_SHIFT_MATCH)
44  if line is None:
45    return None
46  else:
47    words = line.split(' ')
48    time_shift = float(words[-1][:-3])  # strip off 'ms'
49    fit_corr = float(words[-5])
50    return {'time_shift': time_shift, 'corr': fit_corr}
51
52
53def find_matching_line(file_path, match_string):
54  """Search each line in the file at 'file_path' for match_string.
55
56  Args:
57      file_path: File path for file being searched
58      match_string: Sting used to match against lines
59
60  Returns:
61      The first matching line. If none exists, returns None.
62  """
63  with open(file_path) as f:
64    for line in f:
65      if match_string in line:
66        return line
67  return None
68
69
70def main():
71  """Run the sensor_fusion test for stastical purposes.
72
73    Script should be run from the top-level CameraITS directory.
74    All parameters expect 'num_runs' are defined in config.yml.
75    num_runs is defined at run time with 'num_runs=<int>'
76    'camera_id' can be over-written at command line to allow different
77    camera_ids facing the same direction to be tested.
78
79    ie. python tools/run_all_tests.py num_runs=10  # n=10 w/ config.yml cam
80        python tools/run_all_tests.py camera=0 num_runs=10  # n=10 w/ cam[0]
81        python tools/run_all_tests.py camera=0.4 num_runs=10 # n=10 w/ cam[0.4]
82
83    Command line arguments:
84        camera_id: camera_id or list of camera_ids.
85        num_runs: integer number of runs to get statistical values
86
87    All other config values are stored in config.yml file.
88  """
89  logging.basicConfig(level=logging.INFO)
90  # Make output directories to hold the generated files.
91  topdir = tempfile.mkdtemp(prefix='CameraITS_')
92  subprocess.call(['chmod', 'g+rx', topdir])
93
94  camera_id_combos = []
95
96  # Override camera with cmd line values if available
97  num_runs = _NUM_RUNS
98  get_argv_vals = lambda x: x.split('=')[1]
99  for s in list(sys.argv[1:]):
100    if 'camera=' in s:
101      camera_id_combos = str(get_argv_vals(s)).split(',')
102    elif 'num_runs=' in s:
103      num_runs = int(get_argv_vals(s))
104
105  # Read config file and extract relevant TestBed
106  config_file_contents = run_all_tests.get_config_file_contents()
107  for i in config_file_contents['TestBeds']:
108    if i['Name'] != _TEST_BED_SENSOR_FUSION:
109      config_file_contents['TestBeds'].remove(i)
110
111  # Get test parameters from config file
112  test_params_content = run_all_tests.get_test_params(config_file_contents)
113  if not camera_id_combos:
114    camera_id_combos = test_params_content['camera'].split(',')
115  debug = test_params_content['debug_mode']
116  fps = test_params_content['fps']
117  img_size = test_params_content['img_size']
118
119  # Get dut id
120  device_id = run_all_tests.get_device_serial_number(
121      'dut', config_file_contents)
122
123  # Log run info
124  logging.info('Running sensor_fusion on device: %s, camera: %s',
125               device_id, camera_id_combos)
126  logging.info('Saving output files to: %s', topdir)
127
128  for camera_id in camera_id_combos:
129    time_shifts = []
130    # A subdir in topdir will be created for each camera_id.
131    test_params_content['camera'] = camera_id
132    test_params_content['scene'] = 'sensor_fusion'
133    config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
134    os.mkdir(os.path.join(topdir, camera_id))
135
136    # Add the MoblyParams to config.yml file store camera_id test results.
137    mobly_output_logs_path = os.path.join(topdir, camera_id)
138    mobly_scene_output_logs_path = os.path.join(
139        mobly_output_logs_path, 'sensor_fusion')
140    mobly_params_dict = {
141        'MoblyParams': {
142            'LogPath': mobly_scene_output_logs_path
143        }
144    }
145    config_file_contents.update(mobly_params_dict)
146    logging.debug('Config file contents: %s', config_file_contents)
147    tmp_yml_file_name = run_all_tests.get_updated_yml_file(config_file_contents)
148    logging.info('Using %s as temporary config yml file', tmp_yml_file_name)
149
150    # Run tests
151    logging.info('%d runs for test_sensor_fusion with camera %s',
152                 num_runs, camera_id)
153    logging.info('FPS: %d, img size: %s', fps, img_size)
154    for _ in range(num_runs):
155      cmd = ['python',
156             os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
157                          'sensor_fusion', 'test_sensor_fusion.py'),
158             '-c',
159             f'{tmp_yml_file_name}'
160             ]
161      # pylint: disable=subprocess-run-check
162      with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'w') as fp:
163        output = subprocess.run(cmd, stdout=fp)
164      # pylint: enable=subprocess-run-check
165
166      with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as _:
167        if output.returncode == 0:
168          return_string = 'PASS'
169        else:
170          return_string = 'FAIL'
171
172      with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as file:
173        content = file.read()
174        lines = content.splitlines()
175        for one_line in lines:
176          if 'root_output_path:' in one_line:
177            root_output_path = one_line.split(':')[1].strip()
178        file.close()
179
180        os.remove(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE)
181        file_name = os.path.join(root_output_path, 'test_log.DEBUG')
182        time_shift = find_time_shift(file_name)
183        if time_shift is not None:
184          logging.info('%s time_shift: %.4f ms, corr: %.6f', return_string,
185                       time_shift['time_shift'], time_shift['corr'])
186          if time_shift['corr'] < _CORR_DIST_THRESH_MAX:
187            time_shifts.append(time_shift)
188          else:
189            logging.info('Correlation distance too large. Not used for stats.')
190        else:
191          logging.info('time_shift not found')
192
193    # Summarize results with stats
194    times = [t['time_shift'] for t in time_shifts]
195    logging.info('runs: %d, time_shift mean: %.4f, sigma: %.4f',
196                 len(times), np.mean(times), np.std(times))
197
198    # Delete temporary yml file after run.
199    tmp_yml_file = os.path.join(run_all_tests.YAML_FILE_DIR, tmp_yml_file_name)
200    os.remove(tmp_yml_file)
201
202  # Delete temporary image files after run.
203  if debug == 'False':
204    logging.info('Removing tmp dir %s to save space.', topdir)
205    shutil.rmtree(topdir)
206
207  logging.info('Test completed.')
208if __name__ == '__main__':
209  main()
210
211