1# Copyright 2014 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import glob
16import json
17import logging
18import os
19import os.path
20import re
21import subprocess
22import sys
23import tempfile
24import time
25import types
26
27import camera_properties_utils
28import capture_request_utils
29import image_processing_utils
30import its_device_utils
31import its_session_utils
32import lighting_control_utils
33import numpy as np
34import yaml
35
36
37YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP']
38CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml')
39TEST_KEY_TABLET = 'tablet'
40TEST_KEY_SENSOR_FUSION = 'sensor_fusion'
41ACTIVITY_START_WAIT = 1.5  # seconds
42MERGE_RESULTS_TIMEOUT = 3600  # seconds
43
44NUM_TRIES = 2
45RESULT_PASS = 'PASS'
46RESULT_FAIL = 'FAIL'
47RESULT_NOT_EXECUTED = 'NOT_EXECUTED'
48RESULT_KEY = 'result'
49METRICS_KEY = 'mpc_metrics'
50PERFORMANCE_KEY = 'performance_metrics'
51SUMMARY_KEY = 'summary'
52RESULT_VALUES = (RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED)
53CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier'
54ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT'
55EXTRA_VERSION = 'camera.its.extra.VERSION'
56CURRENT_ITS_VERSION = '1.0'  # version number to sync with CtsVerifier
57EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID'
58EXTRA_RESULTS = 'camera.its.extra.RESULTS'
59TIME_KEY_START = 'start'
60TIME_KEY_END = 'end'
61VALID_CONTROLLERS = ('arduino', 'canakit')
62_FRONT_CAMERA_ID = '1'
63# recover replaced '_' in scene def
64_INT_STR_DICT = types.MappingProxyType({'11': '1_1', '12': '1_2'})
65_MAIN_TESTBED = 0
66_PROPERTIES_TO_MATCH = (
67    'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision'
68)
69
70# Scenes that can be automated through tablet display
71# Notes on scene names:
72#   scene*_1/2/... are same scene split to load balance run times for scenes
73#   scene*_a/b/... are similar scenes that share one or more tests
74_TABLET_SCENES = (
75    'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c',
76    'scene2_d', 'scene2_e', 'scene2_f', 'scene3', 'scene4', 'scene6', 'scene7',
77    'scene8', 'scene9',
78    os.path.join('scene_extensions', 'scene_hdr'),
79    os.path.join('scene_extensions', 'scene_low_light'),
80    'scene_video',
81)
82
83# Scenes that use the 'sensor_fusion' test rig
84_MOTION_SCENES = ('sensor_fusion', 'feature_combination',)
85
86# Scenes that uses lighting control
87_FLASH_SCENES = ('scene_flash',)
88
89# Scenes that uses checkerboard as chart
90_CHECKERBOARD_SCENES = ('sensor_fusion', 'scene_flash', 'feature_combination',)
91
92# Scenes that have to be run manually regardless of configuration
93_MANUAL_SCENES = ('scene5',)
94
95# Scene extensions
96_EXTENSIONS_SCENES = (os.path.join('scene_extensions', 'scene_hdr'),
97                      os.path.join('scene_extensions', 'scene_low_light'),
98                      )
99
100# All possible scenes
101_ALL_SCENES = _TABLET_SCENES + _MANUAL_SCENES + _MOTION_SCENES + _FLASH_SCENES
102
103# Scenes that are logically grouped and can be called as group
104_GROUPED_SCENES = types.MappingProxyType({
105        'scene1': ('scene1_1', 'scene1_2'),
106        'scene2': ('scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e',
107                   'scene2_f')
108})
109
110# Scene requirements for manual testing.
111_SCENE_REQ = types.MappingProxyType({
112    'scene0': None,
113    'scene1_1': 'A grey card covering at least the middle 30% of the scene',
114    'scene1_2': 'A grey card covering at least the middle 30% of the scene',
115    'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png',
116    'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png',
117    'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png',
118    'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png',
119    'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
120    'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png',
121    'scene3': 'The ISO12233 chart',
122    'scene4': 'A test chart of a circle covering at least the middle 50% of '
123              'the scene. See tests/scene4/scene4.png',
124    'scene5': 'Capture images with a diffuser attached to the camera. See '
125              'source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser '  # pylint: disable line-too-long
126              'for more details',
127    'scene6': 'A grid of black circles on a white background. '
128              'See tests/scene6/scene6.png',
129    'scene7': 'The picture with 4 different colors, slanted edge and'
130              '4 ArUco markers. See tests/scene7/scene7.png',
131    'scene8': 'The picture with 4 faces in 4 different colors overlay.'
132              'See tests/scene8/scene8.png',
133    'scene9': 'A scene with high entropy consisting of random size and colored '
134              'circles. See tests/scene9/scene9.png',
135    # Use os.path to avoid confusion on other platforms
136    os.path.join('scene_extensions', 'scene_hdr'): (
137        'A tablet displayed scene with a face on the left '
138        'and a low-contrast QR code on the right. '
139        'See tests/scene_extensions/scene_hdr/scene_hdr.png'
140    ),
141    os.path.join('scene_extensions', 'scene_low_light'): (
142        'A tablet displayed scene with a grid of squares of varying '
143        'brightness. See '
144        'tests/scene_extensions/scene_low_light/scene_low_light.png'
145    ),
146    'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of '
147                     'in tests/sensor_fusion/checkerboard.pdf\n'
148                     'See tests/sensor_fusion/SensorFusion.pdf for detailed '
149                     'instructions.\nNote that this test will be skipped '
150                     'on devices not supporting REALTIME camera timestamp.',
151    'feature_combination': 'The same scene as sensor_fusion, '
152                           'separated for easier testing.',
153    'scene_flash': 'A checkerboard pattern chart with lights off.',
154    'scene_video': 'A tablet displayed scene with a series of circles moving '
155                   'at different simulated frame rates. '
156                   'See tests/scene_video/scene_video.mp4',
157})
158
159# Made mutable to allow for test augmentation based on first API level
160SUB_CAMERA_TESTS = {
161    'scene0': (
162        'test_jitter',
163        'test_metadata',
164        'test_request_capture_match',
165        'test_sensor_events',
166        'test_solid_color_test_pattern',
167        'test_unified_timestamps',
168    ),
169    'scene1_1': (
170        'test_burst_capture',
171        'test_burst_sameness_manual',
172        'test_dng_noise_model',
173        'test_exposure_x_iso',
174        'test_linearity',
175    ),
176    'scene1_2': (
177        'test_raw_exposure',
178        'test_raw_sensitivity',
179        'test_yuv_plus_raw',
180    ),
181    'scene2_a': (
182        'test_num_faces',
183    ),
184    'scene4': (
185        'test_aspect_ratio_and_crop',
186    ),
187    'scene_video': (
188        'test_preview_frame_drop',
189    ),
190    'sensor_fusion': (
191        'test_sensor_fusion',
192    ),
193}
194
195_LIGHTING_CONTROL_TESTS = (
196    'test_auto_flash.py',
197    'test_preview_min_frame_rate.py',
198    'test_led_snapshot.py',
199    'test_night_extension.py',
200    'test_low_light_boost_extension.py',
201    'test_hdr_extension.py',
202    )
203
204_EXTENSION_NAMES = (
205    'hdr',
206    'low_light',
207)
208
209_DST_SCENE_DIR = '/sdcard/Download/'
210_SUB_CAMERA_LEVELS = 2
211MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt'
212
213
214def report_result(device_id, camera_id, results):
215  """Sends a pass/fail result to the device, via an intent.
216
217  Args:
218   device_id: The ID string of the device to report the results to.
219   camera_id: The ID string of the camera for which to report pass/fail.
220   results: a dictionary contains all ITS scenes as key and result/summary of
221            current ITS run. See test_report_result unit test for an example.
222  """
223  adb = f'adb -s {device_id}'
224  its_device_utils.start_its_test_activity(device_id)
225  time.sleep(ACTIVITY_START_WAIT)
226
227  # Validate/process results argument
228  for scene in results:
229    if RESULT_KEY not in results[scene]:
230      raise ValueError(f'ITS result not found for {scene}')
231    if results[scene][RESULT_KEY] not in RESULT_VALUES:
232      raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}')
233    if SUMMARY_KEY in results[scene]:
234      device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt'
235      its_device_utils.run(
236          f'{adb} push {results[scene][SUMMARY_KEY]} {device_summary_path}')
237      results[scene][SUMMARY_KEY] = device_summary_path
238
239  json_results = json.dumps(results)
240  cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}"
241         f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es "
242         f"{EXTRA_RESULTS} \'{json_results}\'")
243  its_device_utils.run(cmd)
244
245
246def write_result(testbed_index, device_id, camera_id, results):
247  """Writes results to a temporary file for merging.
248
249  Args:
250    testbed_index: the index of a finished testbed.
251    device_id: the ID string of the device that created results.
252    camera_id: the ID string of the camera of the device.
253    results: a dictionary that contains all ITS scenes as key
254             and result/summary of current ITS run.
255  """
256  result = {'device_id': device_id, 'results': results}
257  file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp'
258  with open(file_name, 'w') as f:
259    json.dump(result, f)
260
261
262def parse_testbeds(completed_testbeds):
263  """Parses completed testbeds and yields device_id, camera_id, and results.
264
265  Args:
266    completed_testbeds: an iterable of completed testbed indices.
267  Yields:
268    device_id: the device associated with the testbed.
269    camera_id: one of the camera_ids associated with the testbed.
270    results: the dictionary with scenes and result/summary of testbed's run.
271  """
272  for i in completed_testbeds:
273    for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'):
274      camera_id = file_name.split('camera_')[1].split('.tmp')[0]
275      device_id = ''
276      results = {}
277      with open(file_name, 'r') as f:
278        testbed_data = json.load(f)
279        device_id = testbed_data['device_id']
280        results = testbed_data['results']
281      if not device_id or not results:
282        raise ValueError(f'device_id or results for {file_name} not found.')
283      yield device_id, camera_id, results
284
285
286def get_device_property(device_id, property_name):
287  """Get property of a given device.
288
289  Args:
290    device_id: the ID string of a device.
291    property_name: the desired property string.
292  Returns:
293    The value of the property.
294  """
295  property_cmd = f'adb -s {device_id} shell getprop {property_name}'
296  raw_output = subprocess.check_output(
297      property_cmd, stderr=subprocess.STDOUT, shell=True)
298  return str(raw_output.decode('utf-8')).strip()
299
300
301def are_devices_similar(device_id_1, device_id_2):
302  """Checks if key dimensions are the same between devices.
303
304  Args:
305    device_id_1: the ID string of the _MAIN_TESTBED device.
306    device_id_2: the ID string of another device.
307  Returns:
308    True if both devices share key dimensions.
309  """
310  for property_to_match in _PROPERTIES_TO_MATCH:
311    property_value_1 = get_device_property(device_id_1, property_to_match)
312    property_value_2 = get_device_property(device_id_2, property_to_match)
313    if property_value_1 != property_value_2:
314      logging.error('%s does not match %s for %s',
315                    property_value_1, property_value_2, property_to_match)
316      return False
317  return True
318
319
320def check_manual_scenes(device_id, camera_id, scene, out_path):
321  """Halt run to change scenes.
322
323  Args:
324    device_id: id of device
325    camera_id: id of camera
326    scene: Name of the scene to copy image files.
327    out_path: output file location
328  """
329  hidden_physical_id = None
330  if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
331    split_camera_ids = camera_id.split(its_session_utils.SUB_CAMERA_SEPARATOR)
332    if len(split_camera_ids) == _SUB_CAMERA_LEVELS:
333      camera_id = split_camera_ids[0]
334      hidden_physical_id = split_camera_ids[1]
335
336  with its_session_utils.ItsSession(
337      device_id=device_id,
338      camera_id=camera_id,
339      hidden_physical_id=hidden_physical_id) as cam:
340    props = cam.get_camera_properties()
341    props = cam.override_with_hidden_physical_camera_props(props)
342
343    while True:
344      input(f'\n Press <ENTER> after positioning camera {camera_id} with '
345            f'{scene}.\n The scene setup should be: \n  {_SCENE_REQ[scene]}\n')
346      # Converge 3A prior to capture
347      if scene == 'scene5':
348        cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props),
349                  lock_awb=camera_properties_utils.awb_lock(props))
350      else:
351        cam.do_3a()
352      req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props)
353      logging.info('Capturing an image to check the test scene')
354      cap = cam.do_capture(req, fmt)
355      img = image_processing_utils.convert_capture_to_rgb_image(cap)
356      img_name = os.path.join(out_path, f'test_{scene}.jpg')
357      logging.info('Please check scene setup in %s', img_name)
358      image_processing_utils.write_image(img, img_name)
359      choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower()
360      if choice == 'y':
361        break
362
363
364def get_config_file_contents():
365  """Read the config file contents from a YML file.
366
367  Args:
368    None
369
370  Returns:
371    config_file_contents: a dict read from config.yml
372  """
373  with open(CONFIG_FILE) as file:
374    config_file_contents = yaml.safe_load(file)
375  return config_file_contents
376
377
378def get_test_params(config_file_contents):
379  """Reads the config file parameters.
380
381  Args:
382    config_file_contents: dict read from config.yml file
383
384  Returns:
385    dict of test parameters
386  """
387  test_params = None
388  for _, j in config_file_contents.items():
389    for datadict in j:
390      test_params = datadict.get('TestParams')
391  return test_params
392
393
394def get_device_serial_number(device, config_file_contents):
395  """Returns the serial number of the device with label from the config file.
396
397  The config file contains TestBeds dictionary which contains Controllers and
398  Android Device dicts.The two devices used by the test per box are listed
399  here labels dut and tablet. Parse through the nested TestBeds dict to get
400  the Android device details.
401
402  Args:
403    device: String device label as specified in config file.dut/tablet
404    config_file_contents: dict read from config.yml file
405  """
406
407  for _, j in config_file_contents.items():
408    for datadict in j:
409      android_device_contents = datadict.get('Controllers')
410      for device_dict in android_device_contents.get('AndroidDevice'):
411        for _, label in device_dict.items():
412          if label == 'tablet':
413            tablet_device_id = str(device_dict.get('serial'))
414          if label == 'dut':
415            dut_device_id = str(device_dict.get('serial'))
416  if device == 'tablet':
417    return tablet_device_id
418  else:
419    return dut_device_id
420
421
422def get_updated_yml_file(yml_file_contents):
423  """Create a new yml file and write the testbed contents in it.
424
425  This testbed file is per box and contains all the parameters and
426  device id used by the mobly tests.
427
428  Args:
429   yml_file_contents: Data to write in yml file.
430
431  Returns:
432    Updated yml file contents.
433  """
434  os.chmod(YAML_FILE_DIR, 0o755)
435  file_descriptor, new_yaml_file = tempfile.mkstemp(
436      suffix='.yml', prefix='config_', dir=YAML_FILE_DIR)
437  os.close(file_descriptor)
438  with open(new_yaml_file, 'w') as f:
439    yaml.dump(yml_file_contents, stream=f, default_flow_style=False)
440  new_yaml_file_name = os.path.basename(new_yaml_file)
441  return new_yaml_file_name
442
443
444def enable_external_storage(device_id):
445  """Override apk mode to allow write to external storage.
446
447  Args:
448    device_id: Serial number of the device.
449
450  """
451  cmd = (f'adb -s {device_id} shell appops '
452         'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow')
453  its_device_utils.run(cmd)
454
455
456def get_available_cameras(device_id, camera_id):
457  """Get available camera devices in the current state.
458
459  Args:
460    device_id: Serial number of the device.
461    camera_id: Logical camera_id
462
463  Returns:
464    List of all the available camera_ids.
465  """
466  with its_session_utils.ItsSession(
467      device_id=device_id,
468      camera_id=camera_id) as cam:
469    props = cam.get_camera_properties()
470    props = cam.override_with_hidden_physical_camera_props(props)
471    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
472        camera_id)
473    unavailable_physical_ids = unavailable_physical_cameras[
474        'unavailablePhysicalCamerasArray']
475    output = cam.get_camera_ids()
476    all_camera_ids = output['cameraIdArray']
477    # Concat camera_id, physical camera_id and sub camera separator
478    unavailable_physical_ids = [f'{camera_id}.{s}'
479                                for s in unavailable_physical_ids]
480    for i in unavailable_physical_ids:
481      if i in all_camera_ids:
482        all_camera_ids.remove(i)
483    logging.debug('available camera ids: %s', all_camera_ids)
484  return all_camera_ids
485
486
487def get_unavailable_physical_cameras(device_id, camera_id):
488  """Get unavailable physical cameras in the current state.
489
490  Args:
491    device_id: Serial number of the device.
492    camera_id: Logical camera device id
493
494  Returns:
495    List of all the unavailable camera_ids.
496  """
497  with its_session_utils.ItsSession(
498      device_id=device_id,
499      camera_id=camera_id) as cam:
500    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
501        camera_id)
502    unavailable_physical_ids = unavailable_physical_cameras[
503        'unavailablePhysicalCamerasArray']
504    unavailable_physical_ids = [f'{camera_id}.{s}'
505                                for s in unavailable_physical_ids]
506    logging.debug('Unavailable physical camera ids: %s',
507                  unavailable_physical_ids)
508  return unavailable_physical_ids
509
510
511def is_device_folded(device_id):
512  """Returns True if the foldable device is in folded state.
513
514  Args:
515    device_id: Serial number of the foldable device.
516  """
517  cmd = (f'adb -s {device_id} shell cmd device_state state')
518  result = subprocess.getoutput(cmd)
519  if 'CLOSE' in result:
520    return True
521  return False
522
523
524def augment_sub_camera_tests(first_api_level):
525  """Adds certain tests to SUB_CAMERA_TESTS depending on first_api_level.
526
527  Args:
528    first_api_level: First api level of the device.
529  """
530  if (first_api_level >= its_session_utils.ANDROID15_API_LEVEL):
531    logging.debug('Augmenting sub camera tests')
532    SUB_CAMERA_TESTS['scene6'] = ('test_in_sensor_zoom',)
533
534
535def main():
536  """Run all the Camera ITS automated tests.
537
538    Script should be run from the top-level CameraITS directory.
539
540    Command line arguments:
541        camera:  the camera(s) to be tested. Use comma to separate multiple
542                 camera Ids. Ex: "camera=0,1" or "camera=1"
543        scenes:  the test scene(s) to be executed. Use comma to separate
544                 multiple scenes. Ex: "scenes=scene0,scene1_1" or
545                 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X
546                 where X is scene name minus 'scene')
547  """
548  logging.basicConfig(level=logging.INFO)
549  # Make output directories to hold the generated files.
550  topdir = tempfile.mkdtemp(prefix='CameraITS_')
551  try:
552    subprocess.call(['chmod', 'g+rx', topdir])
553  except OSError as e:
554    logging.info(repr(e))
555
556  scenes = []
557  camera_id_combos = []
558  testbed_index = None
559  num_testbeds = None
560  # Override camera, scenes and testbed with cmd line values if available
561  for s in list(sys.argv[1:]):
562    if 'scenes=' in s:
563      scenes = s.split('=')[1].split(',')
564    elif 'camera=' in s:
565      camera_id_combos = s.split('=')[1].split(',')
566    elif 'testbed_index=' in s:
567      testbed_index = int(s.split('=')[1])
568    elif 'num_testbeds=' in s:
569      num_testbeds = int(s.split('=')[1])
570    else:
571      raise ValueError(f'Unknown argument {s}')
572  if testbed_index is None and num_testbeds is not None:
573    raise ValueError(
574        'testbed_index must be specified if num_testbeds is specified.')
575  if (testbed_index is not None and num_testbeds is not None and
576      testbed_index >= num_testbeds):
577    raise ValueError('testbed_index must be less than num_testbeds. '
578                     'testbed_index starts at 0.')
579
580  # Prepend 'scene' if not specified at cmd line
581  for i, s in enumerate(scenes):
582    if (not s.startswith('scene') and
583        not s.startswith(('checkerboard', 'sensor_fusion',
584                          'flash', 'feature_combination', '<scene-name>'))):
585      scenes[i] = f'scene{s}'
586    if s.startswith('flash') or s.startswith('extensions'):
587      scenes[i] = f'scene_{s}'
588    # Handle scene_extensions
589    if any(s.startswith(extension) for extension in _EXTENSION_NAMES):
590      scenes[i] = f'scene_extensions/scene_{s}'
591    if (any(s.startswith('scene_' + extension)
592            for extension in _EXTENSION_NAMES)):
593      scenes[i] = f'scene_extensions/{s}'
594
595  # Read config file and extract relevant TestBed
596  config_file_contents = get_config_file_contents()
597  if testbed_index is None:
598    for i in config_file_contents['TestBeds']:
599      if scenes in (
600          ['sensor_fusion'], ['checkerboard'], ['scene_flash'],
601          ['feature_combination']
602      ):
603        if TEST_KEY_SENSOR_FUSION not in i['Name'].lower():
604          config_file_contents['TestBeds'].remove(i)
605      else:
606        if TEST_KEY_SENSOR_FUSION in i['Name'].lower():
607          config_file_contents['TestBeds'].remove(i)
608  else:
609    config_file_contents = {
610        'TestBeds': [config_file_contents['TestBeds'][testbed_index]]
611    }
612
613  # Get test parameters from config file
614  test_params_content = get_test_params(config_file_contents)
615  if not camera_id_combos:
616    camera_id_combos = str(test_params_content['camera']).split(',')
617  if not scenes:
618    scenes = str(test_params_content['scene']).split(',')
619    scenes = [_INT_STR_DICT.get(n, n) for n in scenes]  # recover '1_1' & '1_2'
620
621  device_id = get_device_serial_number('dut', config_file_contents)
622  # Enable external storage on DUT to send summary report to CtsVerifier.apk
623  enable_external_storage(device_id)
624
625  # Add to SUB_CAMERA_TESTS depending on first_api_level
626  augment_sub_camera_tests(its_session_utils.get_first_api_level(device_id))
627
628  # Verify that CTS Verifier is installed
629  its_session_utils.check_apk_installed(device_id, CTS_VERIFIER_PACKAGE_NAME)
630  # Check whether the dut is foldable or not
631  testing_foldable_device = True if test_params_content[
632      'foldable_device'] == 'True' else False
633  available_camera_ids_to_test_foldable = []
634  if testing_foldable_device:
635    logging.debug('Testing foldable device.')
636    # Check the state of foldable device. True if device is folded,
637    # false if the device is opened.
638    device_folded = is_device_folded(device_id)
639    # list of available camera_ids to be tested in device state
640    available_camera_ids_to_test_foldable = get_available_cameras(
641        device_id, _FRONT_CAMERA_ID)
642
643  config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower()
644  logging.info('Saving %s output files to: %s', config_file_test_key, topdir)
645  if TEST_KEY_TABLET in config_file_test_key:
646    tablet_id = get_device_serial_number('tablet', config_file_contents)
647    tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.product.device'
648    raw_output = subprocess.check_output(
649        tablet_name_cmd, stderr=subprocess.STDOUT, shell=True)
650    tablet_name = str(raw_output.decode('utf-8')).strip()
651    logging.debug('Tablet name: %s', tablet_name)
652    brightness = test_params_content['brightness']
653    its_session_utils.validate_tablet(tablet_name, brightness, tablet_id)
654  else:
655    tablet_id = None
656
657  testing_sensor_fusion_with_controller = False
658  if TEST_KEY_SENSOR_FUSION in config_file_test_key:
659    if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS:
660      testing_sensor_fusion_with_controller = True
661
662  testing_flash_with_controller = False
663  if (test_params_content.get('lighting_cntl', 'None').lower() == 'arduino' and
664      'manual' not in config_file_test_key):
665    testing_flash_with_controller = True
666
667  # Expand GROUPED_SCENES and remove any duplicates
668  scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes]
669  scenes = np.hstack(scenes).tolist()
670  scenes = sorted(set(scenes), key=scenes.index)
671  # List of scenes to be executed in folded state will have '_folded'
672  # prefix. This will help distinguish the test results from folded vs
673  # open device state for front camera_ids.
674  folded_device_scenes = []
675  for scene in scenes:
676    folded_device_scenes.append(f'{scene}_folded')
677
678  logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s',
679               device_id, camera_id_combos, scenes)
680
681  # Determine if manual run
682  if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES):
683    auto_scene_switch = True
684  else:
685    auto_scene_switch = False
686    logging.info('Manual, checkerboard scenes, or scene5 testing.')
687
688  folded_prompted = False
689  opened_prompted = False
690  for camera_id in camera_id_combos:
691    test_params_content['camera'] = camera_id
692    results = {}
693    unav_cameras = []
694    # Get the list of unavailable cameras in current device state.
695    # These camera_ids should not be tested in current device state.
696    if testing_foldable_device:
697      unav_cameras = get_unavailable_physical_cameras(
698          device_id, _FRONT_CAMERA_ID)
699
700    if testing_foldable_device:
701      device_state = 'folded' if device_folded else 'opened'
702
703    testing_folded_front_camera = (testing_foldable_device and
704                                   device_folded and
705                                   _FRONT_CAMERA_ID in camera_id)
706
707    # Raise an assertion error if there is any camera unavailable in
708    # current device state. Usually scenes with suffix 'folded' will
709    # be executed in folded state.
710    if (testing_foldable_device and
711        _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras):
712      raise AssertionError(
713          f'Camera {camera_id} is unavailable in device state {device_state}'
714          f' and cannot be tested with device {device_state}!')
715
716    if (testing_folded_front_camera and camera_id not in unav_cameras
717        and not folded_prompted):
718      folded_prompted = True
719      input('\nYou are testing a foldable device in folded state. '
720            'Please make sure the device is folded and press <ENTER> '
721            'after positioning properly.\n')
722
723    if (testing_foldable_device and
724        not device_folded and _FRONT_CAMERA_ID in camera_id and
725        camera_id not in unav_cameras and not opened_prompted):
726      opened_prompted = True
727      input('\nYou are testing a foldable device in opened state. '
728            'Please make sure the device is unfolded and press <ENTER> '
729            'after positioning properly.\n')
730
731    # Run through all scenes if user does not supply one and config file doesn't
732    # have specific scene name listed.
733    if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
734      possible_scenes = list(SUB_CAMERA_TESTS.keys())
735      if auto_scene_switch:
736        possible_scenes.remove('sensor_fusion')
737    else:
738      if 'checkerboard' in scenes:
739        possible_scenes = _CHECKERBOARD_SCENES
740      elif 'scene_flash' in scenes:
741        possible_scenes = _FLASH_SCENES
742      elif 'scene_extensions' in scenes:
743        possible_scenes = _EXTENSIONS_SCENES
744      else:
745        possible_scenes = _TABLET_SCENES if auto_scene_switch else _ALL_SCENES
746
747    if ('<scene-name>' in scenes or 'checkerboard' in scenes or
748        'scene_extensions' in scenes):
749      per_camera_scenes = possible_scenes
750    else:
751      # Validate user input scene names
752      per_camera_scenes = []
753      for s in scenes:
754        if s in possible_scenes:
755          per_camera_scenes.append(s)
756      if not per_camera_scenes:
757        raise ValueError('No valid scene specified for this camera.')
758
759    # Folded state scenes will have 'folded' suffix only for
760    # front cameras since rear cameras are common in both folded
761    # and unfolded state.
762    foldable_per_camera_scenes = []
763    if testing_folded_front_camera:
764      if camera_id not in available_camera_ids_to_test_foldable:
765        raise AssertionError(f'camera {camera_id} is not available.')
766      for s in per_camera_scenes:
767        foldable_per_camera_scenes.append(f'{s}_folded')
768
769    if foldable_per_camera_scenes:
770      per_camera_scenes = foldable_per_camera_scenes
771
772    logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes)
773
774    if testing_folded_front_camera:
775      all_scenes = [f'{s}_folded' for s in _ALL_SCENES]
776    else:
777      all_scenes = _ALL_SCENES
778
779    for s in all_scenes:
780      results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED}
781
782      # assert device folded testing scenes with suffix 'folded'
783      if testing_foldable_device and 'folded' in s:
784        if not device_folded:
785          raise AssertionError('Device should be folded during'
786                               ' testing scenes with suffix "folded"')
787
788    # A subdir in topdir will be created for each camera_id. All scene test
789    # output logs for each camera id will be stored in this subdir.
790    # This output log path is a mobly param : LogPath
791    camera_id_str = (
792        camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_')
793    )
794    mobly_output_logs_path = os.path.join(topdir, f'cam_id_{camera_id_str}')
795    os.mkdir(mobly_output_logs_path)
796    tot_pass = 0
797    for s in per_camera_scenes:
798      results[s]['TEST_STATUS'] = []
799      results[s][METRICS_KEY] = []
800      results[s][PERFORMANCE_KEY] = []
801
802      # unit is millisecond for execution time record in CtsVerifier
803      scene_start_time = int(round(time.time() * 1000))
804      scene_test_summary = f'Cam{camera_id} {s}' + '\n'
805      mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s)
806
807      # Since test directories do not have 'folded' in the name, we need
808      # to remove that suffix for the path of the scenes to be loaded
809      # on the tablets
810      testing_scene = s
811      if 'folded' in s:
812        testing_scene = s.split('_folded')[0]
813      test_params_content['scene'] = testing_scene
814      test_params_content['scene_with_suffix'] = s
815
816      if auto_scene_switch:
817        # Copy scene images onto the tablet
818        if 'scene0' not in testing_scene:
819          its_session_utils.copy_scenes_to_tablet(testing_scene, tablet_id)
820      else:
821        # Check manual scenes for correctness
822        if ('scene0' not in testing_scene and
823            not testing_sensor_fusion_with_controller):
824          check_manual_scenes(device_id, camera_id, testing_scene,
825                              mobly_output_logs_path)
826
827      scene_test_list = []
828      config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
829      # Add the MoblyParams to config.yml file with the path to store camera_id
830      # test results. This is a separate dict other than TestBeds.
831      mobly_params_dict = {
832          'MoblyParams': {
833              'LogPath': mobly_scene_output_logs_path
834          }
835      }
836      config_file_contents.update(mobly_params_dict)
837      logging.debug('Final config file contents: %s', config_file_contents)
838      new_yml_file_name = get_updated_yml_file(config_file_contents)
839      logging.info('Using %s as temporary config yml file', new_yml_file_name)
840      if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1:
841        scene_dir = os.listdir(
842            os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene))
843        for file_name in scene_dir:
844          if file_name.endswith('.py') and 'test' in file_name:
845            scene_test_list.append(file_name)
846      else:  # sub-camera
847        if SUB_CAMERA_TESTS.get(testing_scene):
848          scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[
849              testing_scene]]
850        else:
851          scene_test_list = []
852      scene_test_list.sort()
853
854      # Run tests for scene
855      logging.info('Running tests for %s with camera %s',
856                   testing_scene, camera_id)
857      num_pass = 0
858      num_skip = 0
859      num_not_mandated_fail = 0
860      num_fail = 0
861      for test in scene_test_list:
862        # Handle repeated test
863        if 'tests/' in test:
864          cmd = [
865              'python3',
866              os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c',
867              f'{new_yml_file_name}'
868          ]
869        else:
870          cmd = [
871              'python3',
872              os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
873                           testing_scene, test),
874              '-c',
875              f'{new_yml_file_name}'
876          ]
877        return_string = ''
878        for num_try in range(NUM_TRIES):
879          # Handle manual lighting control redirected stdout in test
880          if (test in _LIGHTING_CONTROL_TESTS and
881              not testing_flash_with_controller):
882            print('Turn lights OFF in rig and press <ENTER> to continue.')
883
884          # pylint: disable=subprocess-run-check
885          with open(
886              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'w') as fp:
887            output = subprocess.run(cmd, stdout=fp)
888          # pylint: enable=subprocess-run-check
889
890          # Parse mobly logs to determine PASS/FAIL(*)/SKIP & socket FAILs
891          with open(
892              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file:
893            test_code = output.returncode
894            test_skipped = False
895            test_not_yet_mandated = False
896            test_mpc_req = ''
897            perf_test_metrics = ''
898            hdr_mpc_req = ''
899            content = file.read()
900
901            # Find media performance class logging
902            lines = content.splitlines()
903            for one_line in lines:
904              # regular expression pattern must match
905              # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in
906              # ItsTestActivity.java.
907              mpc_string_match = re.search(
908                  '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)',
909                  one_line)
910              if mpc_string_match:
911                test_mpc_req = one_line
912                break
913
914            for one_line in lines:
915              # regular expression pattern must match in ItsTestActivity.java.
916              gainmap_string_match = re.search('^has_gainmap:', one_line)
917              if gainmap_string_match:
918                hdr_mpc_req = one_line
919                break
920
921            for one_line in lines:
922              # regular expression pattern must match in ItsTestActivity.java.
923              perf_metrics_string_match = re.search(
924                  '^test.*:',
925                  one_line)
926              if perf_metrics_string_match:
927                perf_test_metrics = one_line
928                # each test can add multiple metrics
929                results[s][PERFORMANCE_KEY].append(perf_test_metrics)
930
931            if 'Test skipped' in content:
932              return_string = 'SKIP '
933              num_skip += 1
934              test_skipped = True
935              break
936
937            if its_session_utils.NOT_YET_MANDATED_MESSAGE in content:
938              return_string = 'FAIL*'
939              num_not_mandated_fail += 1
940              test_not_yet_mandated = True
941              break
942
943            if test_code == 0 and not test_skipped:
944              return_string = 'PASS '
945              num_pass += 1
946              break
947
948            if test_code == 1 and not test_not_yet_mandated:
949              return_string = 'FAIL '
950              if 'Problem with socket' in content and num_try != NUM_TRIES-1:
951                logging.info('Retry %s/%s', s, test)
952              else:
953                num_fail += 1
954                break
955            os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE))
956        status_prefix = ''
957        if testbed_index is not None:
958          status_prefix = config_file_test_key + ':'
959        logging.info('%s%s %s/%s', status_prefix, return_string, s, test)
960        test_name = test.split('/')[-1].split('.')[0]
961        results[s]['TEST_STATUS'].append({
962            'test': test_name,
963            'status': return_string.strip()})
964        if test_mpc_req:
965          results[s][METRICS_KEY].append(test_mpc_req)
966        if hdr_mpc_req:
967          results[s][METRICS_KEY].append(hdr_mpc_req)
968        msg_short = f'{return_string} {test}'
969        scene_test_summary += msg_short + '\n'
970        if (test in _LIGHTING_CONTROL_TESTS and
971            not testing_flash_with_controller):
972          print('Turn lights ON in rig and press <ENTER> to continue.')
973
974      # unit is millisecond for execution time record in CtsVerifier
975      scene_end_time = int(round(time.time() * 1000))
976      skip_string = ''
977      tot_tests = len(scene_test_list)
978      tot_tests_run = tot_tests - num_skip
979      if tot_tests_run != 0:
980        tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run
981      else:
982        tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0
983      tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%'
984      if num_skip > 0:
985        skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped"
986      test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} '
987                     f'tests passed ({tests_passed_ratio_format}){skip_string}')
988      logging.info(test_result)
989      if num_not_mandated_fail > 0:
990        logging.info('(*) %s not_yet_mandated tests failed',
991                     num_not_mandated_fail)
992
993      tot_pass += num_pass
994      logging.info('scene tests: %s, Total tests passed: %s', tot_tests,
995                   tot_pass)
996      if tot_tests > 0:
997        logging.info('%s compatibility score: %.f/100\n',
998                     s, 100 * num_pass / tot_tests)
999        scene_test_summary_path = os.path.join(mobly_scene_output_logs_path,
1000                                               'scene_test_summary.txt')
1001        with open(scene_test_summary_path, 'w') as f:
1002          f.write(scene_test_summary)
1003        results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL)
1004        results[s][SUMMARY_KEY] = scene_test_summary_path
1005        results[s][TIME_KEY_START] = scene_start_time
1006        results[s][TIME_KEY_END] = scene_end_time
1007      else:
1008        logging.info('%s compatibility score: 0/100\n')
1009
1010      # Delete temporary yml file after scene run.
1011      new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name)
1012      os.remove(new_yaml_file_path)
1013
1014    # Log results per camera
1015    if num_testbeds is None or testbed_index == _MAIN_TESTBED:
1016      logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id)
1017      logging.info('ITS results to CtsVerifier: %s', results)
1018      report_result(device_id, camera_id, results)
1019    else:
1020      write_result(testbed_index, device_id, camera_id, results)
1021
1022  logging.info('Test execution completed.')
1023
1024  # Power down tablet
1025  if tablet_id:
1026    cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER'
1027    subprocess.Popen(cmd.split())
1028
1029  # establish connection with lighting controller
1030  lighting_cntl = test_params_content.get('lighting_cntl', 'None')
1031  lighting_ch = test_params_content.get('lighting_ch', 'None')
1032  arduino_serial_port = lighting_control_utils.lighting_control(
1033      lighting_cntl, lighting_ch)
1034
1035  # turn OFF lights
1036  lighting_control_utils.set_lighting_state(
1037      arduino_serial_port, lighting_ch, 'OFF')
1038
1039  if num_testbeds is not None:
1040    if testbed_index == _MAIN_TESTBED:
1041      logging.info('Waiting for all testbeds to finish.')
1042      start = time.time()
1043      completed_testbeds = set()
1044      while time.time() < start + MERGE_RESULTS_TIMEOUT:
1045        for i in range(num_testbeds):
1046          if os.path.isfile(f'testbed_{i}_completed.tmp'):
1047            start = time.time()
1048            completed_testbeds.add(i)
1049        # Already reported _MAIN_TESTBED's results.
1050        if len(completed_testbeds) == num_testbeds - 1:
1051          logging.info('All testbeds completed, merging results.')
1052          for parsed_id, parsed_camera, parsed_results in (
1053              parse_testbeds(completed_testbeds)):
1054            logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s',
1055                          parsed_id, parsed_camera, parsed_results)
1056            if not are_devices_similar(device_id, parsed_id):
1057              logging.error('Device %s and device %s are not the same '
1058                            'model/type/build/revision.',
1059                            device_id, parsed_id)
1060              return
1061            report_result(device_id, parsed_camera, parsed_results)
1062          for temp_file in glob.glob('testbed_*.tmp'):
1063            os.remove(temp_file)
1064          break
1065      else:
1066        logging.error('No testbeds finished in the last %d seconds, '
1067                      'but still expected data. '
1068                      'Completed testbed indices: %s, '
1069                      'expected number of testbeds: %d',
1070                      MERGE_RESULTS_TIMEOUT, list(completed_testbeds),
1071                      num_testbeds)
1072    else:
1073      with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _:
1074        pass
1075
1076if __name__ == '__main__':
1077  main()
1078