1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import os
18import time
19import numpy as np
20import acts_contrib.test_utils.bt.bt_test_utils as bt_utils
21from acts.metrics.loggers.blackbox import BlackboxMetricLogger
22import acts_contrib.test_utils.wifi.wifi_performance_test_utils as wifi_utils
23
24from acts import asserts
25from functools import partial
26from acts_contrib.test_utils.bt.BtSarBaseTest import BtSarBaseTest
27
28
29class BtSarTpcTest(BtSarBaseTest):
30    """ Class to define BT SAR Transmit power control tests in
31
32    This class defines tests to detect any anomalies in the
33    Transmit power control mechanisms
34    """
35    def __init__(self, controllers):
36        super().__init__(controllers)
37        req_params = ['scenario_count']
38        self.sar_tpc_test_result = BlackboxMetricLogger.for_test_case(
39            metric_name='pass_count')
40        self.unpack_userparams(req_params)
41        self.tests = self.generate_test_cases()
42
43    def setup_class(self):
44        super().setup_class()
45        self.push_table(self.dut, self.custom_sar_path)
46        self.attenuator.set_atten(self.atten_min)
47        self.pathloss = int(self.calibration_params['pathloss'])
48        self.sar_df = self.bt_sar_df.copy()
49        self.sar_df['power_cap'] = -128
50        self.sar_df['TPC_result'] = -1
51
52    def setup_test(self):
53        super().setup_test()
54
55        self.tpc_sweep_range = range(self.atten_min, self.pl10_atten)
56        self.log.info(self.current_test_name)
57
58        self.tpc_plots_figure = wifi_utils.BokehFigure(
59            title='{}_{}'.format(self.current_test_name, 'curve'),
60            x_label='Pathloss(dBm)',
61            primary_y_label='Tx Power(dBm)')
62
63        self.tpc_plots_derivative_figure = wifi_utils.BokehFigure(
64            title='{}_{}'.format(self.current_test_name, 'curve_derivative'),
65            x_label='Pathloss(dBm)',
66            primary_y_label='Tx Power(dB)')
67
68    def teardown_class(self):
69        self.dut.adb.shell('rm {}'.format(self.power_file_paths[1]))
70        super().teardown_class()
71
72    def generate_test_cases(self):
73        """Function to generate test cases.
74        Function to generate a test case per BT SAR table row.
75
76        Returns: list of generated test cases.
77        """
78        test_cases = []
79        for scenario in range(int(self.scenario_count)):
80            test_name = 'test_bt_sar_tpc_{}'.format(scenario)
81            setattr(self, test_name, partial(self._test_bt_sar_tpc, scenario))
82            test_cases.append(test_name)
83        return test_cases
84
85    def process_tpc_results(self, tx_power_list, pwlv_list):
86        """Processes the results of tpc sweep.
87
88        Processes tpc sweep to ensure that tx power changes
89        as expected while sweeping.
90
91        Args:
92            tx_power_list: list of tx power measured during the TPC sweep.
93            pwlv_list: list of power levels observed during the TPC sweep.
94
95        Returns:
96            tpc_verdict : result of the tpc sweep; PASS/FAIL.
97            tx_power_derivative : peaks observed during TPC sweep.
98        """
99
100        tpc_verdict = 'FAIL'
101        tx_power_derivative = np.diff(tx_power_list)
102
103        #Remove the PL transition points when checking pass/fail
104        pwlv_list_int = [item % 1 for item in pwlv_list]
105        pwlv_steady_index = [
106            idx for idx, val in enumerate(pwlv_list_int) if val == 0
107        ]
108        pwlv_steady_state_list = [
109            pwlv_list[index] for index in pwlv_steady_index
110        ]
111        tx_power_steady_state_list = [
112            tx_power_list[index] for index in pwlv_steady_index
113        ]
114        tx_power_steady_state_derivative = np.diff(tx_power_steady_state_list)
115
116        #Report issue if the transition period is too long
117        transition_points_count = len(
118            [i for i in list(np.diff(pwlv_steady_index)) if i > 3])
119        if transition_points_count > 0:
120            self.log.warning('TPC transition takes too long')
121            return [tpc_verdict, tx_power_derivative]
122
123        # Locating power level changes in the sweep
124        pwlv_derivative_bool = list(np.diff(pwlv_steady_state_list) == 1)
125
126        # Locating legitimate tx power changes
127        tx_power_derivative_bool = [
128            self.tpc_threshold['positive'][0] < x <
129            self.tpc_threshold['positive'][1]
130            for x in tx_power_steady_state_derivative
131        ]
132        # Ensuring that changes in power level correspond to tx power changes
133        if pwlv_derivative_bool == tx_power_derivative_bool:
134            tpc_verdict = 'PASS'
135            return [tpc_verdict, tx_power_derivative]
136
137        return [tpc_verdict, tx_power_derivative]
138
139    def _test_bt_sar_tpc(self, scenario):
140        """Performs TCP sweep for the given scenario.
141
142        Function performs and evaluates TPC sweep for a given scenario.
143
144        Args:
145            scenario: row of the BT SAR table.
146        """
147
148        master_tx_power_list = []
149        pwlv_list = []
150
151        # Reading BT SAR Scenario from the table
152        start_time = self.dut.adb.shell('date +%s.%m')
153        time.sleep(1)
154
155        # Forcing the SAR state
156        read_scenario = self.sar_df.loc[scenario].to_dict()
157        self.set_sar_state(self.dut, read_scenario, self.country_code)
158
159        # Reading power cap
160        self.sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap(
161            self.dut, start_time)
162
163        # TPC sweep
164        for atten in self.tpc_sweep_range:
165
166            self.attenuator.set_atten(atten)
167            self.log.info('Current TPC attenuation {} dB; scenario {}'.format(
168                atten, scenario))
169
170            processed_bqr = bt_utils.get_bt_metric(self.android_devices,
171                                                   self.duration)
172            # Recording master rssi and pwlv
173            master_tx_power_list.append(
174                processed_bqr['rssi'][self.bt_device_controller.serial] +
175                self.pathloss + atten)
176            pwlv_list.append(processed_bqr['pwlv'][self.dut.serial])
177
178        # Processing tpc sweep results
179        [self.sar_df.loc[scenario, 'TPC_result'], tx_power_derivative
180         ] = self.process_tpc_results(master_tx_power_list, pwlv_list)
181
182        # Plot TPC curves
183        self.tpc_plots_figure.add_line(self.tpc_sweep_range[:-1],
184                                       master_tx_power_list,
185                                       str(scenario),
186                                       marker='circle')
187
188        self.tpc_plots_derivative_figure.add_line(self.tpc_sweep_range[:-1],
189                                                  tx_power_derivative,
190                                                  str(scenario),
191                                                  marker='circle')
192
193        self.tpc_plots_figure.generate_figure()
194        self.tpc_plots_derivative_figure.generate_figure()
195
196        # Saving the TPC curves
197        plot_file_path = os.path.join(self.log_path,
198                                      '{}.html'.format(self.current_test_name))
199        wifi_utils.BokehFigure.save_figures(
200            [self.tpc_plots_figure, self.tpc_plots_derivative_figure],
201            plot_file_path)
202
203        # Asserting pass and fail
204        if self.sar_df.loc[scenario, 'TPC_result'] == 'FAIL':
205            asserts.fail('TPC sweep failed for scenario: {}'.format(scenario))
206
207        else:
208            self.sar_test_result.metric_value = 1
209            asserts.explicit_pass(
210                'TPC sweep passed for scenario: {}'.format(scenario))
211