1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import json
17import logging
18import math
19import os
20import re
21import time
22
23import acts.controllers.power_monitor as power_monitor_lib
24import acts.controllers.monsoon as monsoon_controller
25import acts.controllers.iperf_server as ipf
26from acts import asserts
27from acts import base_test
28from acts import utils
29from acts.metrics.loggers.blackbox import BlackboxMetricLogger
30from acts.controllers.adb_lib.error import AdbError
31from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
32from acts_contrib.test_utils.power import plot_utils
33
34RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
35IPERF_TIMEOUT = 180
36THRESHOLD_TOLERANCE_DEFAULT = 0.2
37GET_FROM_PHONE = 'get_from_dut'
38GET_FROM_AP = 'get_from_ap'
39GET_PROPERTY_HARDWARE_PLATFORM = 'getprop ro.boot.hardware.platform'
40POWER_STATS_DUMPSYS_CMD = 'dumpsys android.hardware.power.stats.IPowerStats/default delta'
41PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
42MONSOON_MAX_CURRENT = 8.0
43DEFAULT_MONSOON_FREQUENCY = 500
44ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
45MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
46TEMP_FILE = '/sdcard/Download/tmp.log'
47
48
49class ObjNew(object):
50    """Create a random obj with unknown attributes and value.
51
52    """
53    def __init__(self, **kwargs):
54        self.__dict__.update(kwargs)
55
56    def __contains__(self, item):
57        """Function to check if one attribute is contained in the object.
58
59        Args:
60            item: the item to check
61        Return:
62            True/False
63        """
64        return hasattr(self, item)
65
66
67class PowerBaseTest(base_test.BaseTestClass):
68    """Base class for all wireless power related tests.
69
70    """
71    def __init__(self, controllers):
72
73        super().__init__(controllers)
74        self.power_result = BlackboxMetricLogger.for_test_case(
75            metric_name='avg_power')
76        self.start_meas_time = 0
77        self.rockbottom_script = None
78        self.img_name = ''
79        self.dut = None
80        self.power_logger = PowerMetricLogger.for_test_case()
81        self.power_monitor = None
82        self.odpm_folder = None
83
84    @property
85    def final_test(self):
86        return len(
87            self.results.requested
88        ) > 0 and self.current_test_name == self.results.requested[-1]
89
90    @property
91    def display_name_test_suite(self):
92        return getattr(self, '_display_name_test_suite',
93                       self.__class__.__name__)
94
95    @display_name_test_suite.setter
96    def display_name_test_suite(self, name):
97        self._display_name_test_suite = name
98
99    @property
100    def display_name_test_case(self):
101        default_test_name = getattr(self, 'test_name', None)
102        return getattr(self, '_display_name_test_case', default_test_name)
103
104    @display_name_test_case.setter
105    def display_name_test_case(self, name):
106        self._display_name_test_case = name
107
108    def initialize_power_monitor(self):
109        """ Initializes the power monitor object.
110
111        Raises an exception if there are no controllers available.
112        """
113        if hasattr(self, 'bitses'):
114            if hasattr(self, 'monsoons'):
115                self.log.info('Destroying monsoon controller.')
116                monsoon_controller.destroy(self.monsoons)
117                time.sleep(2)
118            self.power_monitor = self.bitses[0]
119            self.power_monitor.setup(registry=self.user_params)
120        elif hasattr(self, 'monsoons'):
121            self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
122                self.monsoons[0])
123            self.monsoons[0].set_max_current(8.0)
124            self.monsoons[0].set_voltage(self.mon_voltage)
125        else:
126            raise RuntimeError('No power monitors available.')
127
128    def setup_class(self):
129
130        super().setup_class()
131
132        self.log = logging.getLogger()
133        self.tests = self.get_existing_test_names()
134
135        # Obtain test parameters from user_params
136        TEST_PARAMS = self.TAG + '_params'
137        self.test_params = self.user_params.get(TEST_PARAMS, {})
138        if not self.test_params:
139            self.log.warning(TEST_PARAMS + ' was not found in the user '
140                             'parameters defined in the config file.')
141
142        # Override user_param values with test parameters
143        self.user_params.update(self.test_params)
144
145        # Unpack user_params with default values. All the usages of user_params
146        # as self attributes need to be included either as a required parameter
147        # or as a parameter with a default value.
148        req_params = ['custom_files', 'mon_duration']
149        self.unpack_userparams(req_params,
150                               mon_freq=DEFAULT_MONSOON_FREQUENCY,
151                               mon_offset=0,
152                               bug_report=False,
153                               extra_wait=None,
154                               iperf_duration=None,
155                               pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
156                               mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT,
157                               ap_dtim_period=None,
158                               bits_root_rail_csv_export=False,
159                               is_odpm_supported=False)
160
161        # Setup the must have controllers, phone and monsoon
162        self.dut = self.android_devices[0]
163        self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
164        os.makedirs(self.mon_data_path, exist_ok=True)
165
166        # Make odpm path for P21 or later
167        platform = self.dut.adb.shell(GET_PROPERTY_HARDWARE_PLATFORM)
168        self.log.info('The hardware platform is {}'.format(platform))
169        if (
170            platform.startswith('gs')
171            or platform.startswith('z')
172            or self.is_odpm_supported
173        ):
174            self.odpm_folder = os.path.join(self.log_path, 'odpm')
175            os.makedirs(self.odpm_folder, exist_ok=True)
176            self.log.info('For P21 or later, create odpm folder {}'.format(
177                self.odpm_folder))
178
179        # Initialize the power monitor object that will be used to measure
180        self.initialize_power_monitor()
181
182        # Unpack the thresholds file or fail class setup if it can't be found
183        for file in self.custom_files:
184            if 'pass_fail_threshold_' + self.dut.model in file:
185                self.threshold_file = file
186                break
187        else:
188            raise RuntimeError('Required test pass/fail threshold file is '
189                               'missing')
190
191        # Unpack the rockbottom script or fail class setup if it can't be found
192        for file in self.custom_files:
193            if 'rockbottom_' + self.dut.model in file:
194                self.rockbottom_script = file
195                break
196        else:
197            raise RuntimeError('Required rockbottom script is missing.')
198
199        # Unpack optional custom files
200        for file in self.custom_files:
201            if 'attenuator_setting' in file:
202                self.attenuation_file = file
203            elif 'network_config' in file:
204                self.network_file = file
205
206        if hasattr(self, 'attenuators'):
207            self.num_atten = self.attenuators[0].instrument.num_atten
208            self.atten_level = self.unpack_custom_file(self.attenuation_file)
209        self.threshold = self.unpack_custom_file(self.threshold_file)
210        self.mon_info = self.create_monsoon_info()
211
212        # Sync device time, timezone and country code
213        utils.require_sl4a((self.dut, ))
214        utils.sync_device_time(self.dut)
215
216        screen_on_img = self.user_params.get('screen_on_img', [])
217        if screen_on_img:
218            img_src = screen_on_img[0]
219            img_dest = '/sdcard/Pictures/'
220            success = self.dut.push_system_file(img_src, img_dest)
221            if success:
222                self.img_name = os.path.basename(img_src)
223
224    def setup_test(self):
225        """Set up test specific parameters or configs.
226
227        """
228        super().setup_test()
229
230        # Reset result variables
231        self.avg_current = 0
232        self.samples = []
233        self.power_result.metric_value = 0
234
235        # Set the device into rockbottom state
236        self.dut_rockbottom()
237
238        # Wait for extra time if needed for the first test
239        if self.extra_wait:
240            self.more_wait_first_test()
241
242    def teardown_test(self):
243        """Tear down necessary objects after test case is finished.
244
245        """
246        self.log.info('Tearing down the test case')
247        self.power_monitor.connect_usb()
248        self.power_logger.set_avg_power(self.power_result.metric_value)
249        self.power_logger.set_avg_current(self.avg_current)
250        self.power_logger.set_voltage(self.mon_voltage)
251        self.power_logger.set_testbed(self.testbed_name)
252
253        # If a threshold was provided, log it in the power proto
254        if self.threshold and self.test_name in self.threshold:
255            avg_current_threshold = self.threshold[self.test_name]
256            self.power_logger.set_avg_current_threshold(avg_current_threshold)
257
258        build_id = self.dut.build_info.get('build_id', '')
259        incr_build_id = self.dut.build_info.get('incremental_build_id', '')
260        branch = self.user_params.get('branch', '')
261        target = self.dut.device_info.get('flavor', '')
262
263        self.power_logger.set_branch(branch)
264        self.power_logger.set_build_id(build_id)
265        self.power_logger.set_incremental_build_id(incr_build_id)
266        self.power_logger.set_target(target)
267
268        # Log the display name of the test suite and test case
269        if self.display_name_test_suite:
270            name = self.display_name_test_suite
271            self.power_logger.set_test_suite_display_name(name)
272
273        if self.display_name_test_case:
274            name = self.display_name_test_case
275            self.power_logger.set_test_case_display_name(name)
276
277        # Take Bugreport
278        if self.bug_report:
279            begin_time = utils.get_current_epoch_time()
280            self.dut.take_bug_report(self.test_name, begin_time)
281
282        # Allow the device to cooldown before executing the next test
283        cooldown = self.test_params.get('cooldown', None)
284        if cooldown and not self.final_test:
285            time.sleep(cooldown)
286
287    def teardown_class(self):
288        """Clean up the test class after tests finish running
289
290        """
291        self.log.info('Tearing down the test class')
292        if self.power_monitor:
293            self.power_monitor.connect_usb()
294
295    def on_fail(self, test_name, begin_time):
296        self.power_logger.set_pass_fail_status('FAIL')
297
298    def on_pass(self, test_name, begin_time):
299        self.power_logger.set_pass_fail_status('PASS')
300
301    def dut_save_odpm(self, tag):
302        """Dumpsys ODPM data and save it to self.odpm_folder.
303
304        Args:
305            tag: the moment of save ODPM data
306        """
307        odpm_file_name = '{}.{}.dumpsys_odpm_{}.txt'.format(
308            self.__class__.__name__,
309            self.current_test_name,
310            tag)
311        odpm_file_path = os.path.join(self.odpm_folder, odpm_file_name)
312
313        try:
314            stats = self.dut.adb.shell(POWER_STATS_DUMPSYS_CMD)
315            with open(odpm_file_path, 'w') as f:
316                f.write(stats)
317        except AdbError as e:
318            self.log.warning('Odpm data with tag {} did not save due to adb '
319                             'error {}'.format(e))
320
321    def dut_rockbottom(self):
322        """Set the dut to rockbottom state
323
324        """
325        # The rockbottom script might include a device reboot, so it is
326        # necessary to stop SL4A during its execution.
327        self.dut.stop_services()
328        self.log.info('Executing rockbottom script for ' + self.dut.model)
329        os.chmod(self.rockbottom_script, 0o777)
330        os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
331                                    self.img_name))
332        # Make sure the DUT is in root mode after coming back
333        self.dut.root_adb()
334        # Restart SL4A
335        self.dut.start_services()
336
337    def unpack_custom_file(self, file, test_specific=True):
338        """Unpack the pass_fail_thresholds from a common file.
339
340        Args:
341            file: the common file containing pass fail threshold.
342            test_specific: if True, returns the JSON element within the file
343                that starts with the test class name.
344        """
345        with open(file, 'r') as f:
346            params = json.load(f)
347        if test_specific:
348            try:
349                return params[self.TAG]
350            except KeyError:
351                pass
352        else:
353            return params
354
355    def decode_test_configs(self, attrs, indices):
356        """Decode the test config/params from test name.
357
358        Remove redundant function calls when tests are similar.
359        Args:
360            attrs: a list of the attrs of the test config obj
361            indices: a list of the location indices of keyword in the test name.
362        """
363        # Decode test parameters for the current test
364        test_params = self.current_test_name.split('_')
365        values = [test_params[x] for x in indices]
366        config_dict = dict(zip(attrs, values))
367        self.test_configs = ObjNew(**config_dict)
368
369    def more_wait_first_test(self):
370        # For the first test, increase the offset for longer wait time
371        if self.current_test_name == self.tests[0]:
372            self.mon_info.offset = self.mon_offset + self.extra_wait
373        else:
374            self.mon_info.offset = self.mon_offset
375
376    def set_attenuation(self, atten_list):
377        """Function to set the attenuator to desired attenuations.
378
379        Args:
380            atten_list: list containing the attenuation for each attenuator.
381        """
382        if len(atten_list) != self.num_atten:
383            raise Exception('List given does not have the correct length')
384        for i in range(self.num_atten):
385            self.attenuators[i].set_atten(atten_list[i])
386
387    def measure_power_and_validate(self):
388        """The actual test flow and result processing and validate.
389
390        """
391        self.collect_power_data()
392        self.pass_fail_check(self.avg_current)
393
394    def collect_power_data(self):
395        """Measure power, plot and take log if needed.
396
397        Returns:
398            A MonsoonResult object.
399        """
400        # Collecting current measurement data and plot
401        samples = self.power_monitor_data_collect_save()
402
403        current = [sample[1] for sample in samples]
404        average_current = sum(current) * 1000 / len(current)
405
406        self.power_result.metric_value = (average_current * self.mon_voltage)
407        self.avg_current = average_current
408
409        plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model,
410                                       self.dut.build_info['build_id'])
411        plot_utils.current_waveform_plot(samples, self.mon_voltage,
412                                         self.mon_info.data_path, plot_title)
413
414        return samples
415
416    def pass_fail_check(self, average_current=None):
417        """Check the test result and decide if it passed or failed.
418
419        The threshold is provided in the config file. In this class, result is
420        current in mA.
421        """
422
423        if not self.threshold or self.test_name not in self.threshold:
424            self.log.error("No threshold is provided for the test '{}' in "
425                           "the configuration file.".format(self.test_name))
426            return
427
428        current_threshold = self.threshold[self.test_name]
429        if average_current:
430            asserts.assert_true(
431                abs(average_current - current_threshold) / current_threshold <
432                self.pass_fail_tolerance,
433                'Measured average current in [{}]: {:.2f}mA, which is '
434                'out of the acceptable range {:.2f}±{:.2f}mA'.format(
435                    self.test_name, average_current, current_threshold,
436                    self.pass_fail_tolerance * current_threshold))
437            asserts.explicit_pass(
438                'Measurement finished for [{}]: {:.2f}mA, which is '
439                'within the acceptable range {:.2f}±{:.2f}'.format(
440                    self.test_name, average_current, current_threshold,
441                    self.pass_fail_tolerance * current_threshold))
442        else:
443            asserts.fail(
444                'Something happened, measurement is not complete, test failed')
445
446    def create_monsoon_info(self):
447        """Creates the config dictionary for monsoon
448
449        Returns:
450            mon_info: Dictionary with the monsoon packet config
451        """
452        mon_info = ObjNew(freq=self.mon_freq,
453                          duration=self.mon_duration,
454                          offset=self.mon_offset,
455                          data_path=self.mon_data_path)
456        return mon_info
457
458    def power_monitor_data_collect_save(self):
459        """Current measurement and save the log file.
460
461        Collect current data using Monsoon box and return the path of the
462        log file. Take bug report if requested.
463
464        Returns:
465            A list of tuples in which the first element is a timestamp and the
466            second element is the sampled current in Amperes at that time.
467        """
468
469        tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
470                                self.dut.build_info['build_id'])
471
472        data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
473
474        # If the specified Monsoon data file already exists (e.g., multiple
475        # measurements in a single test), write the results to a new file with
476        # the postfix "_#".
477        if os.path.exists(data_path):
478            highest_value = 1
479            for filename in os.listdir(os.path.dirname(data_path)):
480                match = re.match(r'{}_(\d+).txt'.format(tag), filename)
481                if match:
482                    highest_value = max(highest_value, int(match.group(1)))
483
484            data_path = os.path.join(self.mon_info.data_path,
485                                     '%s_%s.txt' % (tag, highest_value + 1))
486
487        # Resets the battery status right before the test starts.
488        self.dut.adb.shell(RESET_BATTERY_STATS)
489        self.log.info('Starting power measurement. Duration: {}s. Offset: '
490                      '{}s. Voltage: {} V.'.format(self.mon_info.duration,
491                                                   self.mon_info.offset,
492                                                   self.mon_voltage))
493
494        # TODO(b/155426729): Create an accurate host-to-device time difference
495        # measurement.
496        device_time_cmd = 'echo $EPOCHREALTIME'
497        device_time = self.dut.adb.shell(device_time_cmd)
498        host_time = time.time()
499        self.log.debug('device start time %s, host start time %s', device_time,
500                       host_time)
501        device_to_host_offset = float(device_time) - host_time
502
503        # Start the power measurement using monsoon.
504        self.dut.stop_services()
505        time.sleep(1)
506
507        # P21 or later device, save the odpm data before power measurement
508        if self.odpm_folder:
509            self.dut_save_odpm('before')
510
511        self.power_monitor.disconnect_usb()
512        measurement_args = dict(duration=self.mon_info.duration,
513                                measure_after_seconds=self.mon_info.offset,
514                                hz=self.mon_info.freq)
515        self.power_monitor.measure(measurement_args=measurement_args,
516                                   measurement_name=self.test_name,
517                                   start_time=device_to_host_offset,
518                                   monsoon_output_path=data_path)
519        self.power_monitor.release_resources()
520        self.collect_raw_data_samples()
521        self.power_monitor.connect_usb()
522        self.dut.wait_for_boot_completion()
523        time.sleep(10)
524
525        # For P21 or later device, save the odpm data after power measurement
526        if self.odpm_folder:
527            self.dut_save_odpm('after')
528
529        self.dut.start_services()
530
531        return self.power_monitor.get_waveform(file_path=data_path)
532
533    def process_iperf_results(self):
534        """Get the iperf results and process.
535
536        Returns:
537             throughput: the average throughput during tests.
538        """
539        # Get IPERF results and add this to the plot title
540        RESULTS_DESTINATION = os.path.join(
541            self.iperf_server.log_path,
542            'iperf_client_output_{}.log'.format(self.current_test_name))
543        self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
544        # Calculate the average throughput
545        if self.use_client_output:
546            iperf_file = RESULTS_DESTINATION
547        else:
548            iperf_file = self.iperf_server.log_files[-1]
549        try:
550            iperf_result = ipf.IPerfResult(iperf_file)
551
552            # Compute the throughput in Mbit/s
553            throughput = (math.fsum(
554                iperf_result.instantaneous_rates[self.start_meas_time:-1]
555            ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
556                          ) * 8 * (1.024**2)
557
558            self.log.info('The average throughput is {}'.format(throughput))
559        except ValueError:
560            self.log.warning('Cannot get iperf result. Setting to 0')
561            throughput = 0
562        return throughput
563
564    def collect_raw_data_samples(self):
565        if hasattr(self, 'bitses') and self.bits_root_rail_csv_export:
566            path = os.path.join(os.path.dirname(self.mon_info.data_path),
567                                'Kibble')
568            self.power_monitor.get_bits_root_rail_csv_export(path, self.test_name)
569