1#!/usr/bin/env python3.4
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import numpy
21import json
22import re
23import os
24from acts import context
25from acts import base_test
26from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
27from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
28from acts_contrib.test_utils.cellular.performance.CellularThroughputBaseTest import CellularThroughputBaseTest
29from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
30from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
31from functools import partial
32
33
34class CellularLteSensitivityTest(CellularThroughputBaseTest):
35    """Class to test single cell LTE sensitivity"""
36
37    def __init__(self, controllers):
38        base_test.BaseTestClass.__init__(self, controllers)
39        self.testcase_metric_logger = (
40            BlackboxMappedMetricLogger.for_test_case())
41        self.testclass_metric_logger = (
42            BlackboxMappedMetricLogger.for_test_class())
43        self.publish_testcase_metrics = True
44        self.testclass_params = self.user_params['lte_sensitivity_test_params']
45        self.tests = self.generate_test_cases(dl_mcs_list=list(
46            numpy.arange(27, -1, -1)),
47                                              lte_dl_mcs_table='QAM256',
48                                              lte_ul_mcs_table='QAM256',
49                                              lte_ul_mcs=4,
50                                              transform_precoding=0)
51
52    def process_testclass_results(self):
53        # Plot individual test id results raw data and compile metrics
54        plots = collections.OrderedDict()
55        compiled_data = collections.OrderedDict()
56        for testcase_name, testcase_data in self.testclass_results.items():
57            cell_config = testcase_data['testcase_params'][
58                'endc_combo_config']['cell_list'][0]
59            test_id = tuple(('band', cell_config['band']))
60            if test_id not in plots:
61                # Initialize test id data when not present
62                compiled_data[test_id] = {
63                    'mcs': [],
64                    'average_throughput': [],
65                    'theoretical_throughput': [],
66                    'cell_power': [],
67                }
68                plots[test_id] = BokehFigure(
69                    title='Band {} ({}) - BLER Curves'.format(
70                        cell_config['band'],
71                        testcase_data['testcase_params']['lte_dl_mcs_table']),
72                    x_label='Cell Power (dBm)',
73                    primary_y_label='BLER (Mbps)')
74                test_id_rvr = test_id + tuple('RvR')
75                plots[test_id_rvr] = BokehFigure(
76                    title='Band {} ({}) - RvR'.format(
77                        cell_config['band'],
78                        testcase_data['testcase_params']['lte_dl_mcs_table']),
79                    x_label='Cell Power (dBm)',
80                    primary_y_label='PHY Rate (Mbps)')
81            # Compile test id data and metrics
82            compiled_data[test_id]['average_throughput'].append(
83                testcase_data['average_throughput_list'])
84            compiled_data[test_id]['cell_power'].append(
85                testcase_data['cell_power_list'])
86            compiled_data[test_id]['mcs'].append(
87                testcase_data['testcase_params']['lte_dl_mcs'])
88            # Add test id to plots
89            plots[test_id].add_line(
90                testcase_data['cell_power_list'],
91                testcase_data['bler_list'],
92                'MCS {}'.format(
93                    testcase_data['testcase_params']['lte_dl_mcs']),
94                width=1)
95            plots[test_id_rvr].add_line(
96                testcase_data['cell_power_list'],
97                testcase_data['average_throughput_list'],
98                'MCS {}'.format(
99                    testcase_data['testcase_params']['lte_dl_mcs']),
100                width=1,
101                style='dashed')
102
103        for test_id, test_data in compiled_data.items():
104            test_id_rvr = test_id + tuple('RvR')
105            cell_power_interp = sorted(set(sum(test_data['cell_power'], [])))
106            average_throughput_interp = []
107            for mcs, cell_power, throughput in zip(
108                    test_data['mcs'], test_data['cell_power'],
109                    test_data['average_throughput']):
110                throughput_interp = numpy.interp(cell_power_interp,
111                                                 cell_power[::-1],
112                                                 throughput[::-1])
113                average_throughput_interp.append(throughput_interp)
114            rvr = numpy.max(average_throughput_interp, 0)
115            plots[test_id_rvr].add_line(cell_power_interp, rvr,
116                                        'Rate vs. Range')
117
118        figure_list = []
119        for plot_id, plot in plots.items():
120            plot.generate_figure()
121            figure_list.append(plot)
122        output_file_path = os.path.join(self.log_path, 'results.html')
123        BokehFigure.save_figures(figure_list, output_file_path)
124
125        """Saves CSV with all test results to enable comparison."""
126        results_file_path = os.path.join(
127            context.get_current_context().get_full_output_path(),
128            'results.csv')
129        with open(results_file_path, 'w', newline='') as csvfile:
130            field_names = [
131                'Test Name', 'Sensitivity'
132            ]
133            writer = csv.DictWriter(csvfile, fieldnames=field_names)
134            writer.writeheader()
135
136            for testcase_name, testcase_results in self.testclass_results.items(
137            ):
138                row_dict = {
139                    'Test Name': testcase_name,
140                    'Sensitivity': testcase_results['sensitivity']
141                }
142                writer.writerow(row_dict)
143
144    def process_testcase_results(self):
145        if self.current_test_name not in self.testclass_results:
146            return
147        testcase_data = self.testclass_results[self.current_test_name]
148
149        bler_list = []
150        average_throughput_list = []
151        theoretical_throughput_list = []
152        cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][
153            0]
154        for result in testcase_data['results']:
155            bler_list.append(result['throughput_measurements']
156                             ['lte_bler_result']['total']['DL']['nack_ratio'])
157            average_throughput_list.append(
158                result['throughput_measurements']['lte_tput_result']['total']
159                ['DL']['average_tput'])
160            theoretical_throughput_list.append(
161                result['throughput_measurements']['lte_tput_result']['total']
162                ['DL']['theoretical_tput'])
163        padding_len = len(cell_power_list) - len(average_throughput_list)
164        average_throughput_list.extend([0] * padding_len)
165        theoretical_throughput_list.extend([0] * padding_len)
166
167        bler_above_threshold = [
168            bler > self.testclass_params['bler_threshold']
169            for bler in bler_list
170        ]
171        for idx in range(len(bler_above_threshold)):
172            if all(bler_above_threshold[idx:]):
173                sensitivity_idx = max(idx, 1) - 1
174                break
175        else:
176            sensitivity_idx = -1
177        sensitivity = cell_power_list[sensitivity_idx]
178        self.log.info('LTE Band {} Table {} MCS {} Sensitivity = {}dBm'.format(
179            testcase_data['testcase_params']['endc_combo_config']['cell_list']
180            [0]['band'], testcase_data['testcase_params']['lte_dl_mcs_table'],
181            testcase_data['testcase_params']['lte_dl_mcs'], sensitivity))
182
183        testcase_data['bler_list'] = bler_list
184        testcase_data['average_throughput_list'] = average_throughput_list
185        testcase_data[
186            'theoretical_throughput_list'] = theoretical_throughput_list
187        testcase_data['cell_power_list'] = cell_power_list
188        testcase_data['sensitivity'] = sensitivity
189
190        results_file_path = os.path.join(
191            context.get_current_context().get_full_output_path(),
192            '{}.json'.format(self.current_test_name))
193        with open(results_file_path, 'w') as results_file:
194            json.dump(wputils.serialize_dict(testcase_data),
195                      results_file,
196                      indent=4)
197
198    def get_per_cell_power_sweeps(self, testcase_params):
199        # get reference test
200        current_band = testcase_params['endc_combo_config']['cell_list'][0][
201            'band']
202        reference_test = None
203        reference_sensitivity = None
204        for testcase_name, testcase_data in self.testclass_results.items():
205            if testcase_data['testcase_params']['endc_combo_config'][
206                    'cell_list'][0]['band'] == current_band:
207                reference_test = testcase_name
208                reference_sensitivity = testcase_data['sensitivity']
209        if reference_test and reference_sensitivity and not self.retry_flag:
210            start_atten = reference_sensitivity + self.testclass_params[
211                'adjacent_mcs_gap']
212            self.log.info(
213                "Reference test {} found. Sensitivity {} dBm. Starting at {} dBm"
214                .format(reference_test, reference_sensitivity, start_atten))
215        else:
216            start_atten = self.testclass_params['lte_cell_power_start']
217            self.log.info(
218                "Reference test not found. Starting at {} dBm".format(
219                    start_atten))
220        # get current cell power start
221        cell_power_sweeps = [
222            list(
223                numpy.arange(start_atten,
224                             self.testclass_params['lte_cell_power_stop'],
225                             self.testclass_params['lte_cell_power_step']))
226        ]
227        return cell_power_sweeps
228
229    def generate_test_cases(self, dl_mcs_list, lte_dl_mcs_table,
230                            lte_ul_mcs_table, lte_ul_mcs, **kwargs):
231        test_cases = []
232        with open(self.testclass_params['lte_single_cell_configs'],
233                  'r') as csvfile:
234            test_configs = csv.DictReader(csvfile)
235            for test_config, lte_dl_mcs in itertools.product(
236                    test_configs, dl_mcs_list):
237                if int(test_config['skip_test']):
238                    continue
239                endc_combo_config = cputils.generate_endc_combo_config_from_csv_row(
240                    test_config)
241                test_name = 'test_lte_B{}_dl_{}_mcs{}'.format(
242                    test_config['lte_band'], lte_dl_mcs_table, lte_dl_mcs)
243                test_params = collections.OrderedDict(
244                    endc_combo_config=endc_combo_config,
245                    lte_dl_mcs_table=lte_dl_mcs_table,
246                    lte_dl_mcs=lte_dl_mcs,
247                    lte_ul_mcs_table=lte_ul_mcs_table,
248                    lte_ul_mcs=lte_ul_mcs,
249                    **kwargs)
250                setattr(self, test_name,
251                        partial(self._test_throughput_bler, test_params))
252                test_cases.append(test_name)
253        return test_cases
254
255
256class CellularLteSensitivity_SampleMCS_Test(CellularLteSensitivityTest):
257    """Class to test single cell LTE sensitivity"""
258
259    def __init__(self, controllers):
260        base_test.BaseTestClass.__init__(self, controllers)
261        self.testcase_metric_logger = (
262            BlackboxMappedMetricLogger.for_test_case())
263        self.testclass_metric_logger = (
264            BlackboxMappedMetricLogger.for_test_class())
265        self.publish_testcase_metrics = True
266        self.testclass_params = self.user_params['lte_sensitivity_test_params']
267        self.tests = self.generate_test_cases(dl_mcs_list=[27,25,16,9],
268                                              lte_dl_mcs_table='QAM256',
269                                              lte_ul_mcs_table='QAM256',
270                                              lte_ul_mcs=4,
271                                              transform_precoding=0)