1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import json 17import logging 18import math 19import os 20import re 21import time 22 23import acts.controllers.power_monitor as power_monitor_lib 24import acts.controllers.monsoon as monsoon_controller 25import acts.controllers.iperf_server as ipf 26from acts import asserts 27from acts import base_test 28from acts import utils 29from acts.metrics.loggers.blackbox import BlackboxMetricLogger 30from acts.controllers.adb_lib.error import AdbError 31from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger 32from acts_contrib.test_utils.power import plot_utils 33 34RESET_BATTERY_STATS = 'dumpsys batterystats --reset' 35IPERF_TIMEOUT = 180 36THRESHOLD_TOLERANCE_DEFAULT = 0.2 37GET_FROM_PHONE = 'get_from_dut' 38GET_FROM_AP = 'get_from_ap' 39GET_PROPERTY_HARDWARE_PLATFORM = 'getprop ro.boot.hardware.platform' 40POWER_STATS_DUMPSYS_CMD = 'dumpsys android.hardware.power.stats.IPowerStats/default delta' 41PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 42MONSOON_MAX_CURRENT = 8.0 43DEFAULT_MONSOON_FREQUENCY = 500 44ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 45MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 46TEMP_FILE = '/sdcard/Download/tmp.log' 47 48 49class ObjNew(object): 50 """Create a random obj with unknown attributes and value. 51 52 """ 53 def __init__(self, **kwargs): 54 self.__dict__.update(kwargs) 55 56 def __contains__(self, item): 57 """Function to check if one attribute is contained in the object. 58 59 Args: 60 item: the item to check 61 Return: 62 True/False 63 """ 64 return hasattr(self, item) 65 66 67class PowerBaseTest(base_test.BaseTestClass): 68 """Base class for all wireless power related tests. 69 70 """ 71 def __init__(self, controllers): 72 73 super().__init__(controllers) 74 self.power_result = BlackboxMetricLogger.for_test_case( 75 metric_name='avg_power') 76 self.start_meas_time = 0 77 self.rockbottom_script = None 78 self.img_name = '' 79 self.dut = None 80 self.power_logger = PowerMetricLogger.for_test_case() 81 self.power_monitor = None 82 self.odpm_folder = None 83 84 @property 85 def final_test(self): 86 return len( 87 self.results.requested 88 ) > 0 and self.current_test_name == self.results.requested[-1] 89 90 @property 91 def display_name_test_suite(self): 92 return getattr(self, '_display_name_test_suite', 93 self.__class__.__name__) 94 95 @display_name_test_suite.setter 96 def display_name_test_suite(self, name): 97 self._display_name_test_suite = name 98 99 @property 100 def display_name_test_case(self): 101 default_test_name = getattr(self, 'test_name', None) 102 return getattr(self, '_display_name_test_case', default_test_name) 103 104 @display_name_test_case.setter 105 def display_name_test_case(self, name): 106 self._display_name_test_case = name 107 108 def initialize_power_monitor(self): 109 """ Initializes the power monitor object. 110 111 Raises an exception if there are no controllers available. 112 """ 113 if hasattr(self, 'bitses'): 114 if hasattr(self, 'monsoons'): 115 self.log.info('Destroying monsoon controller.') 116 monsoon_controller.destroy(self.monsoons) 117 time.sleep(2) 118 self.power_monitor = self.bitses[0] 119 self.power_monitor.setup(registry=self.user_params) 120 elif hasattr(self, 'monsoons'): 121 self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade( 122 self.monsoons[0]) 123 self.monsoons[0].set_max_current(8.0) 124 self.monsoons[0].set_voltage(self.mon_voltage) 125 else: 126 raise RuntimeError('No power monitors available.') 127 128 def setup_class(self): 129 130 super().setup_class() 131 132 self.log = logging.getLogger() 133 self.tests = self.get_existing_test_names() 134 135 # Obtain test parameters from user_params 136 TEST_PARAMS = self.TAG + '_params' 137 self.test_params = self.user_params.get(TEST_PARAMS, {}) 138 if not self.test_params: 139 self.log.warning(TEST_PARAMS + ' was not found in the user ' 140 'parameters defined in the config file.') 141 142 # Override user_param values with test parameters 143 self.user_params.update(self.test_params) 144 145 # Unpack user_params with default values. All the usages of user_params 146 # as self attributes need to be included either as a required parameter 147 # or as a parameter with a default value. 148 req_params = ['custom_files', 'mon_duration'] 149 self.unpack_userparams(req_params, 150 mon_freq=DEFAULT_MONSOON_FREQUENCY, 151 mon_offset=0, 152 bug_report=False, 153 extra_wait=None, 154 iperf_duration=None, 155 pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, 156 mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT, 157 ap_dtim_period=None, 158 bits_root_rail_csv_export=False, 159 is_odpm_supported=False) 160 161 # Setup the must have controllers, phone and monsoon 162 self.dut = self.android_devices[0] 163 self.mon_data_path = os.path.join(self.log_path, 'Monsoon') 164 os.makedirs(self.mon_data_path, exist_ok=True) 165 166 # Make odpm path for P21 or later 167 platform = self.dut.adb.shell(GET_PROPERTY_HARDWARE_PLATFORM) 168 self.log.info('The hardware platform is {}'.format(platform)) 169 if ( 170 platform.startswith('gs') 171 or platform.startswith('z') 172 or self.is_odpm_supported 173 ): 174 self.odpm_folder = os.path.join(self.log_path, 'odpm') 175 os.makedirs(self.odpm_folder, exist_ok=True) 176 self.log.info('For P21 or later, create odpm folder {}'.format( 177 self.odpm_folder)) 178 179 # Initialize the power monitor object that will be used to measure 180 self.initialize_power_monitor() 181 182 # Unpack the thresholds file or fail class setup if it can't be found 183 for file in self.custom_files: 184 if 'pass_fail_threshold_' + self.dut.model in file: 185 self.threshold_file = file 186 break 187 else: 188 raise RuntimeError('Required test pass/fail threshold file is ' 189 'missing') 190 191 # Unpack the rockbottom script or fail class setup if it can't be found 192 for file in self.custom_files: 193 if 'rockbottom_' + self.dut.model in file: 194 self.rockbottom_script = file 195 break 196 else: 197 raise RuntimeError('Required rockbottom script is missing.') 198 199 # Unpack optional custom files 200 for file in self.custom_files: 201 if 'attenuator_setting' in file: 202 self.attenuation_file = file 203 elif 'network_config' in file: 204 self.network_file = file 205 206 if hasattr(self, 'attenuators'): 207 self.num_atten = self.attenuators[0].instrument.num_atten 208 self.atten_level = self.unpack_custom_file(self.attenuation_file) 209 self.threshold = self.unpack_custom_file(self.threshold_file) 210 self.mon_info = self.create_monsoon_info() 211 212 # Sync device time, timezone and country code 213 utils.require_sl4a((self.dut, )) 214 utils.sync_device_time(self.dut) 215 216 screen_on_img = self.user_params.get('screen_on_img', []) 217 if screen_on_img: 218 img_src = screen_on_img[0] 219 img_dest = '/sdcard/Pictures/' 220 success = self.dut.push_system_file(img_src, img_dest) 221 if success: 222 self.img_name = os.path.basename(img_src) 223 224 def setup_test(self): 225 """Set up test specific parameters or configs. 226 227 """ 228 super().setup_test() 229 230 # Reset result variables 231 self.avg_current = 0 232 self.samples = [] 233 self.power_result.metric_value = 0 234 235 # Set the device into rockbottom state 236 self.dut_rockbottom() 237 238 # Wait for extra time if needed for the first test 239 if self.extra_wait: 240 self.more_wait_first_test() 241 242 def teardown_test(self): 243 """Tear down necessary objects after test case is finished. 244 245 """ 246 self.log.info('Tearing down the test case') 247 self.power_monitor.connect_usb() 248 self.power_logger.set_avg_power(self.power_result.metric_value) 249 self.power_logger.set_avg_current(self.avg_current) 250 self.power_logger.set_voltage(self.mon_voltage) 251 self.power_logger.set_testbed(self.testbed_name) 252 253 # If a threshold was provided, log it in the power proto 254 if self.threshold and self.test_name in self.threshold: 255 avg_current_threshold = self.threshold[self.test_name] 256 self.power_logger.set_avg_current_threshold(avg_current_threshold) 257 258 build_id = self.dut.build_info.get('build_id', '') 259 incr_build_id = self.dut.build_info.get('incremental_build_id', '') 260 branch = self.user_params.get('branch', '') 261 target = self.dut.device_info.get('flavor', '') 262 263 self.power_logger.set_branch(branch) 264 self.power_logger.set_build_id(build_id) 265 self.power_logger.set_incremental_build_id(incr_build_id) 266 self.power_logger.set_target(target) 267 268 # Log the display name of the test suite and test case 269 if self.display_name_test_suite: 270 name = self.display_name_test_suite 271 self.power_logger.set_test_suite_display_name(name) 272 273 if self.display_name_test_case: 274 name = self.display_name_test_case 275 self.power_logger.set_test_case_display_name(name) 276 277 # Take Bugreport 278 if self.bug_report: 279 begin_time = utils.get_current_epoch_time() 280 self.dut.take_bug_report(self.test_name, begin_time) 281 282 # Allow the device to cooldown before executing the next test 283 cooldown = self.test_params.get('cooldown', None) 284 if cooldown and not self.final_test: 285 time.sleep(cooldown) 286 287 def teardown_class(self): 288 """Clean up the test class after tests finish running 289 290 """ 291 self.log.info('Tearing down the test class') 292 if self.power_monitor: 293 self.power_monitor.connect_usb() 294 295 def on_fail(self, test_name, begin_time): 296 self.power_logger.set_pass_fail_status('FAIL') 297 298 def on_pass(self, test_name, begin_time): 299 self.power_logger.set_pass_fail_status('PASS') 300 301 def dut_save_odpm(self, tag): 302 """Dumpsys ODPM data and save it to self.odpm_folder. 303 304 Args: 305 tag: the moment of save ODPM data 306 """ 307 odpm_file_name = '{}.{}.dumpsys_odpm_{}.txt'.format( 308 self.__class__.__name__, 309 self.current_test_name, 310 tag) 311 odpm_file_path = os.path.join(self.odpm_folder, odpm_file_name) 312 313 try: 314 stats = self.dut.adb.shell(POWER_STATS_DUMPSYS_CMD) 315 with open(odpm_file_path, 'w') as f: 316 f.write(stats) 317 except AdbError as e: 318 self.log.warning('Odpm data with tag {} did not save due to adb ' 319 'error {}'.format(e)) 320 321 def dut_rockbottom(self): 322 """Set the dut to rockbottom state 323 324 """ 325 # The rockbottom script might include a device reboot, so it is 326 # necessary to stop SL4A during its execution. 327 self.dut.stop_services() 328 self.log.info('Executing rockbottom script for ' + self.dut.model) 329 os.chmod(self.rockbottom_script, 0o777) 330 os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial, 331 self.img_name)) 332 # Make sure the DUT is in root mode after coming back 333 self.dut.root_adb() 334 # Restart SL4A 335 self.dut.start_services() 336 337 def unpack_custom_file(self, file, test_specific=True): 338 """Unpack the pass_fail_thresholds from a common file. 339 340 Args: 341 file: the common file containing pass fail threshold. 342 test_specific: if True, returns the JSON element within the file 343 that starts with the test class name. 344 """ 345 with open(file, 'r') as f: 346 params = json.load(f) 347 if test_specific: 348 try: 349 return params[self.TAG] 350 except KeyError: 351 pass 352 else: 353 return params 354 355 def decode_test_configs(self, attrs, indices): 356 """Decode the test config/params from test name. 357 358 Remove redundant function calls when tests are similar. 359 Args: 360 attrs: a list of the attrs of the test config obj 361 indices: a list of the location indices of keyword in the test name. 362 """ 363 # Decode test parameters for the current test 364 test_params = self.current_test_name.split('_') 365 values = [test_params[x] for x in indices] 366 config_dict = dict(zip(attrs, values)) 367 self.test_configs = ObjNew(**config_dict) 368 369 def more_wait_first_test(self): 370 # For the first test, increase the offset for longer wait time 371 if self.current_test_name == self.tests[0]: 372 self.mon_info.offset = self.mon_offset + self.extra_wait 373 else: 374 self.mon_info.offset = self.mon_offset 375 376 def set_attenuation(self, atten_list): 377 """Function to set the attenuator to desired attenuations. 378 379 Args: 380 atten_list: list containing the attenuation for each attenuator. 381 """ 382 if len(atten_list) != self.num_atten: 383 raise Exception('List given does not have the correct length') 384 for i in range(self.num_atten): 385 self.attenuators[i].set_atten(atten_list[i]) 386 387 def measure_power_and_validate(self): 388 """The actual test flow and result processing and validate. 389 390 """ 391 self.collect_power_data() 392 self.pass_fail_check(self.avg_current) 393 394 def collect_power_data(self): 395 """Measure power, plot and take log if needed. 396 397 Returns: 398 A MonsoonResult object. 399 """ 400 # Collecting current measurement data and plot 401 samples = self.power_monitor_data_collect_save() 402 403 current = [sample[1] for sample in samples] 404 average_current = sum(current) * 1000 / len(current) 405 406 self.power_result.metric_value = (average_current * self.mon_voltage) 407 self.avg_current = average_current 408 409 plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model, 410 self.dut.build_info['build_id']) 411 plot_utils.current_waveform_plot(samples, self.mon_voltage, 412 self.mon_info.data_path, plot_title) 413 414 return samples 415 416 def pass_fail_check(self, average_current=None): 417 """Check the test result and decide if it passed or failed. 418 419 The threshold is provided in the config file. In this class, result is 420 current in mA. 421 """ 422 423 if not self.threshold or self.test_name not in self.threshold: 424 self.log.error("No threshold is provided for the test '{}' in " 425 "the configuration file.".format(self.test_name)) 426 return 427 428 current_threshold = self.threshold[self.test_name] 429 if average_current: 430 asserts.assert_true( 431 abs(average_current - current_threshold) / current_threshold < 432 self.pass_fail_tolerance, 433 'Measured average current in [{}]: {:.2f}mA, which is ' 434 'out of the acceptable range {:.2f}±{:.2f}mA'.format( 435 self.test_name, average_current, current_threshold, 436 self.pass_fail_tolerance * current_threshold)) 437 asserts.explicit_pass( 438 'Measurement finished for [{}]: {:.2f}mA, which is ' 439 'within the acceptable range {:.2f}±{:.2f}'.format( 440 self.test_name, average_current, current_threshold, 441 self.pass_fail_tolerance * current_threshold)) 442 else: 443 asserts.fail( 444 'Something happened, measurement is not complete, test failed') 445 446 def create_monsoon_info(self): 447 """Creates the config dictionary for monsoon 448 449 Returns: 450 mon_info: Dictionary with the monsoon packet config 451 """ 452 mon_info = ObjNew(freq=self.mon_freq, 453 duration=self.mon_duration, 454 offset=self.mon_offset, 455 data_path=self.mon_data_path) 456 return mon_info 457 458 def power_monitor_data_collect_save(self): 459 """Current measurement and save the log file. 460 461 Collect current data using Monsoon box and return the path of the 462 log file. Take bug report if requested. 463 464 Returns: 465 A list of tuples in which the first element is a timestamp and the 466 second element is the sampled current in Amperes at that time. 467 """ 468 469 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 470 self.dut.build_info['build_id']) 471 472 data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) 473 474 # If the specified Monsoon data file already exists (e.g., multiple 475 # measurements in a single test), write the results to a new file with 476 # the postfix "_#". 477 if os.path.exists(data_path): 478 highest_value = 1 479 for filename in os.listdir(os.path.dirname(data_path)): 480 match = re.match(r'{}_(\d+).txt'.format(tag), filename) 481 if match: 482 highest_value = max(highest_value, int(match.group(1))) 483 484 data_path = os.path.join(self.mon_info.data_path, 485 '%s_%s.txt' % (tag, highest_value + 1)) 486 487 # Resets the battery status right before the test starts. 488 self.dut.adb.shell(RESET_BATTERY_STATS) 489 self.log.info('Starting power measurement. Duration: {}s. Offset: ' 490 '{}s. Voltage: {} V.'.format(self.mon_info.duration, 491 self.mon_info.offset, 492 self.mon_voltage)) 493 494 # TODO(b/155426729): Create an accurate host-to-device time difference 495 # measurement. 496 device_time_cmd = 'echo $EPOCHREALTIME' 497 device_time = self.dut.adb.shell(device_time_cmd) 498 host_time = time.time() 499 self.log.debug('device start time %s, host start time %s', device_time, 500 host_time) 501 device_to_host_offset = float(device_time) - host_time 502 503 # Start the power measurement using monsoon. 504 self.dut.stop_services() 505 time.sleep(1) 506 507 # P21 or later device, save the odpm data before power measurement 508 if self.odpm_folder: 509 self.dut_save_odpm('before') 510 511 self.power_monitor.disconnect_usb() 512 measurement_args = dict(duration=self.mon_info.duration, 513 measure_after_seconds=self.mon_info.offset, 514 hz=self.mon_info.freq) 515 self.power_monitor.measure(measurement_args=measurement_args, 516 measurement_name=self.test_name, 517 start_time=device_to_host_offset, 518 monsoon_output_path=data_path) 519 self.power_monitor.release_resources() 520 self.collect_raw_data_samples() 521 self.power_monitor.connect_usb() 522 self.dut.wait_for_boot_completion() 523 time.sleep(10) 524 525 # For P21 or later device, save the odpm data after power measurement 526 if self.odpm_folder: 527 self.dut_save_odpm('after') 528 529 self.dut.start_services() 530 531 return self.power_monitor.get_waveform(file_path=data_path) 532 533 def process_iperf_results(self): 534 """Get the iperf results and process. 535 536 Returns: 537 throughput: the average throughput during tests. 538 """ 539 # Get IPERF results and add this to the plot title 540 RESULTS_DESTINATION = os.path.join( 541 self.iperf_server.log_path, 542 'iperf_client_output_{}.log'.format(self.current_test_name)) 543 self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION) 544 # Calculate the average throughput 545 if self.use_client_output: 546 iperf_file = RESULTS_DESTINATION 547 else: 548 iperf_file = self.iperf_server.log_files[-1] 549 try: 550 iperf_result = ipf.IPerfResult(iperf_file) 551 552 # Compute the throughput in Mbit/s 553 throughput = (math.fsum( 554 iperf_result.instantaneous_rates[self.start_meas_time:-1] 555 ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) 556 ) * 8 * (1.024**2) 557 558 self.log.info('The average throughput is {}'.format(throughput)) 559 except ValueError: 560 self.log.warning('Cannot get iperf result. Setting to 0') 561 throughput = 0 562 return throughput 563 564 def collect_raw_data_samples(self): 565 if hasattr(self, 'bitses') and self.bits_root_rail_csv_export: 566 path = os.path.join(os.path.dirname(self.mon_info.data_path), 567 'Kibble') 568 self.power_monitor.get_bits_root_rail_csv_export(path, self.test_name) 569