1#!/usr/bin/env python3.4
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import re
22
23import numpy
24import os
25import time
26from acts import asserts
27from acts import context
28from acts import base_test
29from acts import utils
30from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
31from acts.controllers.utils_lib import ssh
32from acts.controllers import iperf_server as ipf
33from acts.controllers import power_monitor as power_monitor_lib
34from acts_contrib.test_utils.cellular.keysight_5g_testapp import Keysight5GTestApp
35from acts_contrib.test_utils.cellular.keysight_chamber import KeysightChamber
36from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
37from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
38from acts_contrib.test_utils.power import plot_utils as power_plot_utils
39
40LONG_SLEEP = 10
41MEDIUM_SLEEP = 2
42IPERF_TIMEOUT = 10
43SHORT_SLEEP = 1
44VERY_SHORT_SLEEP = 0.1
45SUBFRAME_LENGTH = 0.001
46STOP_COUNTER_LIMIT = 3
47RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
48DEFAULT_MONSOON_FREQUENCY = 500
49PHONE_BATTERY_VOLTAGE_DEFAULT = 4
50
51from functools import wraps
52import logging
53
54
55def suspend_logging(func):
56
57    @wraps(func)
58    def inner(*args, **kwargs):
59        logging.disable(logging.FATAL)
60        try:
61            return func(*args, **kwargs)
62        finally:
63            logging.disable(logging.NOTSET)
64
65    return inner
66
67
68class CellularThroughputBaseTest(base_test.BaseTestClass):
69    """Base class for Cellular Throughput Testing
70
71    This base class enables cellular throughput tests on a lab/callbox setup
72    with PHY layer or iperf traffic.
73    """
74
75    def __init__(self, controllers):
76        base_test.BaseTestClass.__init__(self, controllers)
77        self.testcase_metric_logger = (
78            BlackboxMappedMetricLogger.for_test_case())
79        self.testclass_metric_logger = (
80            BlackboxMappedMetricLogger.for_test_class())
81        self.publish_testcase_metrics = True
82        self.testclass_params = {}
83
84    def setup_class(self):
85        """Initializes common test hardware and parameters.
86
87        This function initializes hardwares and compiles parameters that are
88        common to all tests in this class.
89        """
90        # Setup controllers
91        self.dut = self.android_devices[-1]
92        self.dut_utils = cputils.DeviceUtils(self.dut, self.log)
93        self.keysight_test_app = Keysight5GTestApp(
94            self.user_params['Keysight5GTestApp'])
95        if 'KeysightChamber' in self.user_params:
96            self.keysight_chamber = KeysightChamber(
97                self.user_params['KeysightChamber'])
98        self.remote_server = ssh.connection.SshConnection(
99            ssh.settings.from_config(
100                self.user_params['RemoteServer']['ssh_config']))
101
102        self.unpack_userparams(MonsoonParams=None,
103                               bits_root_rail_csv_export=False)
104        self.power_monitor = self.initialize_power_monitor()
105
106        # Configure Tester
107        if self.testclass_params.get('reload_scpi', 0):
108            self.keysight_test_app.import_scpi_file(
109                self.testclass_params['scpi_file'])
110
111        # Declare testclass variables
112        self.testclass_results = collections.OrderedDict()
113
114        # Configure test retries
115        self.user_params['retry_tests'] = [self.__class__.__name__]
116
117        # Turn Airplane mode on
118        self.dut_utils.toggle_airplane_mode(True, False)
119
120    def teardown_class(self):
121        if self.power_monitor:
122            self.power_monitor.connect_usb()
123            self.dut.wait_for_boot_completion()
124        self.log.info('Turning airplane mode on')
125        try:
126            self.dut_utils.toggle_airplane_mode(True, False)
127        except:
128            self.log.warning('Cannot perform teardown operations on DUT.')
129        try:
130            self.keysight_test_app.turn_all_cells_off()
131            self.keysight_test_app.destroy()
132        except:
133            self.log.warning('Cannot perform teardown operations on tester.')
134        self.process_testclass_results()
135
136    def setup_test(self):
137        self.retry_flag = False
138        if self.testclass_params.get('enable_pixel_logs', 0):
139            self.dut_utils.start_pixel_logger()
140
141    def teardown_test(self):
142        if self.power_monitor:
143            self.power_monitor.connect_usb()
144        self.retry_flag = False
145        self.log.info('Turing airplane mode on')
146        self.dut_utils.toggle_airplane_mode(True, False)
147        self.log.info('Turning all cells off.')
148        self.keysight_test_app.turn_all_cells_off()
149        log_path = os.path.join(
150            context.get_current_context().get_full_output_path(), 'pixel_logs')
151        os.makedirs(self.log_path, exist_ok=True)
152        if self.testclass_params.get('enable_pixel_logs', 0):
153            self.dut_utils.stop_pixel_logger(log_path)
154        self.process_testcase_results()
155        self.pass_fail_check()
156
157    def on_retry(self):
158        """Function to control test logic on retried tests.
159
160        This function is automatically executed on tests that are being
161        retried. In this case the function resets wifi, toggles it off and on
162        and sets a retry_flag to enable further tweaking the test logic on
163        second attempts.
164        """
165        self.dut_utils.toggle_airplane_mode(True, False)
166        if self.keysight_test_app.get_cell_state('LTE', 'CELL1'):
167            self.log.info('Turning LTE off.')
168            self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0)
169        self.retry_flag = True
170
171    def initialize_power_monitor(self):
172        """ Initializes the power monitor object.
173
174        Raises an exception if there are no controllers available.
175        """
176
177        device_time = self.dut.adb.shell('echo $EPOCHREALTIME')
178        host_time = time.time()
179        self.log.debug('device start time %s, host start time %s', device_time,
180                       host_time)
181        self.device_to_host_offset = float(device_time) - host_time
182        if hasattr(self, 'bitses'):
183            power_monitor = self.bitses[0]
184            power_monitor.setup(registry=self.user_params)
185        elif hasattr(self, 'monsoons'):
186            power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
187                self.monsoons[0])
188            self.monsoons[0].set_max_current(self.MonsoonParams['current'])
189            self.monsoons[0].set_voltage(self.MonsoonParams['voltage'])
190        else:
191            power_monitor = None
192        return power_monitor
193
194    def pass_fail_check(self):
195        pass
196
197    def process_testcase_results(self):
198        pass
199
200    def process_testclass_results(self):
201        pass
202
203    def get_per_cell_power_sweeps(self, testcase_params):
204        raise NotImplementedError(
205            'get_per_cell_power_sweeps must be implemented.')
206
207    def compile_test_params(self, testcase_params):
208        """Function that completes all test params based on the test name.
209
210        Args:
211            testcase_params: dict containing test-specific parameters
212        """
213        # Measurement Duration
214        testcase_params['bler_measurement_length'] = int(
215            self.testclass_params['traffic_duration'] / SUBFRAME_LENGTH)
216        # Cell power sweep
217        # TODO: Make this a function to support single power and sweep modes for each cell
218        testcase_params['cell_power_sweep'] = self.get_per_cell_power_sweeps(
219            testcase_params)
220        # Traffic & iperf params
221        if self.testclass_params['traffic_type'] == 'PHY':
222            return testcase_params
223        if self.testclass_params['traffic_type'] == 'TCP':
224            testcase_params['iperf_socket_size'] = self.testclass_params.get(
225                'tcp_socket_size', None)
226            testcase_params['iperf_processes'] = self.testclass_params.get(
227                'tcp_processes', 1)
228        elif self.testclass_params['traffic_type'] == 'UDP':
229            testcase_params['iperf_socket_size'] = self.testclass_params.get(
230                'udp_socket_size', None)
231            testcase_params['iperf_processes'] = self.testclass_params.get(
232                'udp_processes', 1)
233        adb_iperf_server = isinstance(self.iperf_servers[0],
234                                      ipf.IPerfServerOverAdb)
235        if testcase_params['traffic_direction'] == 'DL':
236            reverse_direction = 0 if adb_iperf_server else 1
237            testcase_params[
238                'use_client_output'] = False if adb_iperf_server else True
239        elif testcase_params['traffic_direction'] == 'UL':
240            reverse_direction = 1 if adb_iperf_server else 0
241            testcase_params[
242                'use_client_output'] = True if adb_iperf_server else False
243        testcase_params['iperf_args'] = wputils.get_iperf_arg_string(
244            duration=self.testclass_params['traffic_duration'],
245            reverse_direction=reverse_direction,
246            traffic_type=self.testclass_params['traffic_type'],
247            socket_size=testcase_params['iperf_socket_size'],
248            num_processes=testcase_params['iperf_processes'],
249            udp_throughput=self.testclass_params['UDP_rates'].get(
250                testcase_params['num_dl_cells'],
251                self.testclass_params['UDP_rates']["default"]),
252            udp_length=1440)
253        return testcase_params
254
255    def run_iperf_traffic(self, testcase_params):
256        self.iperf_servers[0].start(tag=0)
257        dut_ip = self.dut.droid.connectivityGetIPv4Addresses('rmnet0')[0]
258        if 'iperf_server_address' in self.testclass_params:
259            iperf_server_address = self.testclass_params[
260                'iperf_server_address']
261        elif isinstance(self.iperf_servers[0], ipf.IPerfServerOverAdb):
262            iperf_server_address = dut_ip
263        else:
264            iperf_server_address = wputils.get_server_address(
265                self.remote_server, dut_ip, '255.255.255.0')
266        client_output_path = self.iperf_clients[0].start(
267            iperf_server_address, testcase_params['iperf_args'], 0,
268            self.testclass_params['traffic_duration'] + IPERF_TIMEOUT)
269        server_output_path = self.iperf_servers[0].stop()
270        # Parse and log result
271        if testcase_params['use_client_output']:
272            iperf_file = client_output_path
273        else:
274            iperf_file = server_output_path
275        try:
276            iperf_result = ipf.IPerfResult(iperf_file)
277            current_throughput = numpy.mean(iperf_result.instantaneous_rates[
278                self.testclass_params['iperf_ignored_interval']:-1]) * 8 * (
279                    1.024**2)
280        except:
281            self.log.warning(
282                'ValueError: Cannot get iperf result. Setting to 0')
283            current_throughput = 0
284        return current_throughput
285
286    def start_single_throughput_measurement(self, testcase_params):
287        self.log.info('Starting BLER & throughput tests.')
288        if testcase_params['endc_combo_config']['nr_cell_count']:
289            self.keysight_test_app.start_bler_measurement(
290                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
291                testcase_params['bler_measurement_length'])
292        if testcase_params['endc_combo_config']['lte_cell_count']:
293            self.keysight_test_app.start_bler_measurement(
294                'LTE',
295                testcase_params['endc_combo_config']['lte_dl_carriers'][0],
296                testcase_params['bler_measurement_length'])
297        if self.testclass_params['traffic_type'] != 'PHY':
298            #TODO: get iperf to run in non-blocking mode
299            self.log.warning(
300                'iperf traffic not currently supported with power measurement')
301
302    def stop_single_throughput_measurement(self, testcase_params):
303        result = collections.OrderedDict()
304        if testcase_params['endc_combo_config']['nr_cell_count']:
305            result['nr_bler_result'] = self.keysight_test_app.get_bler_result(
306                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
307                testcase_params['endc_combo_config']['nr_ul_carriers'],
308                testcase_params['bler_measurement_length'])
309            result['nr_tput_result'] = self.keysight_test_app.get_throughput(
310                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
311                testcase_params['endc_combo_config']['nr_ul_carriers'])
312        if testcase_params['endc_combo_config']['lte_cell_count']:
313            result['lte_bler_result'] = self.keysight_test_app.get_bler_result(
314                cell_type='LTE',
315                dl_cells=testcase_params['endc_combo_config']
316                ['lte_dl_carriers'],
317                ul_cells=testcase_params['endc_combo_config']
318                ['lte_ul_carriers'],
319                length=testcase_params['bler_measurement_length'])
320            result['lte_tput_result'] = self.keysight_test_app.get_throughput(
321                'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'],
322                testcase_params['endc_combo_config']['lte_ul_carriers'])
323        return result
324
325    def run_single_throughput_measurement(self, testcase_params):
326        result = collections.OrderedDict()
327        self.log.info('Starting BLER & throughput tests.')
328        if testcase_params['endc_combo_config']['nr_cell_count']:
329            self.keysight_test_app.start_bler_measurement(
330                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
331                testcase_params['bler_measurement_length'])
332        if testcase_params['endc_combo_config']['lte_cell_count']:
333            self.keysight_test_app.start_bler_measurement(
334                'LTE',
335                testcase_params['endc_combo_config']['lte_dl_carriers'][0],
336                testcase_params['bler_measurement_length'])
337
338        if self.testclass_params['traffic_type'] != 'PHY':
339            result['iperf_throughput'] = self.run_iperf_traffic(
340                testcase_params)
341
342        if testcase_params['endc_combo_config']['nr_cell_count']:
343            result['nr_bler_result'] = self.keysight_test_app.get_bler_result(
344                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
345                testcase_params['endc_combo_config']['nr_ul_carriers'],
346                testcase_params['bler_measurement_length'])
347            result['nr_tput_result'] = self.keysight_test_app.get_throughput(
348                'NR5G', testcase_params['endc_combo_config']['nr_dl_carriers'],
349                testcase_params['endc_combo_config']['nr_ul_carriers'])
350        if testcase_params['endc_combo_config']['lte_cell_count']:
351            result['lte_bler_result'] = self.keysight_test_app.get_bler_result(
352                cell_type='LTE',
353                dl_cells=testcase_params['endc_combo_config']
354                ['lte_dl_carriers'],
355                ul_cells=testcase_params['endc_combo_config']
356                ['lte_ul_carriers'],
357                length=testcase_params['bler_measurement_length'])
358            result['lte_tput_result'] = self.keysight_test_app.get_throughput(
359                'LTE', testcase_params['endc_combo_config']['lte_dl_carriers'],
360                testcase_params['endc_combo_config']['lte_ul_carriers'])
361        return result
362
363    @suspend_logging
364    def meausre_power_silently(self, measurement_time, measurement_wait,
365                               data_path):
366        measurement_args = dict(duration=measurement_time,
367                                measure_after_seconds=measurement_wait,
368                                hz=self.MonsoonParams['frequency'])
369
370        self.power_monitor.measure(measurement_args=measurement_args,
371                                   measurement_name=self.test_name,
372                                   start_time=self.device_to_host_offset,
373                                   monsoon_output_path=data_path)
374
375    def collect_power_data(self,
376                           measurement_time,
377                           measurement_wait,
378                           reconnect_usb=0,
379                           measurement_tag=0):
380        """Measure power, plot and take log if needed.
381
382        Returns:
383            A MonsoonResult object.
384            measurement_time: length of power measurement
385            measurement_wait: wait before measurement(within monsoon controller)
386            measurement_tag: tag to append to file names
387        """
388        if self.dut.is_connected():
389            self.dut_utils.stop_services()
390            time.sleep(SHORT_SLEEP)
391            self.dut_utils.log_odpm(
392                os.path.join(
393                    context.get_current_context().get_full_output_path(),
394                    '{}.txt'.format('before')))
395            self.power_monitor.disconnect_usb()
396        else:
397            self.log.info('DUT already disconnected. Skipping USB operations.')
398
399        self.log.info('Starting power measurement. Duration: {}s. Offset: '
400                      '{}s. Voltage: {} V.'.format(
401                          measurement_time, measurement_wait,
402                          self.MonsoonParams['voltage']))
403        # Collecting current measurement data and plot
404        tag = '{}_{}'.format(self.test_name, measurement_tag)
405        data_path = os.path.join(
406            context.get_current_context().get_full_output_path(),
407            '{}.txt'.format(tag))
408        self.meausre_power_silently(measurement_time, measurement_wait,
409                                    data_path)
410        self.power_monitor.release_resources()
411        if hasattr(self, 'bitses') and self.bits_root_rail_csv_export:
412            path = os.path.join(
413                context.get_current_context().get_full_output_path(), 'Kibble')
414            self.power_monitor.get_bits_root_rail_csv_export(
415                path, self.test_name)
416
417        if reconnect_usb:
418            self.log.info('Reconnecting USB.')
419            self.power_monitor.connect_usb()
420            self.dut.wait_for_boot_completion()
421            # Save ODPM if applicable
422            self.dut_utils.log_odpm(
423                os.path.join(
424                    context.get_current_context().get_full_output_path(),
425                    '{}.txt'.format('after')))
426            # Restart Sl4a and other services
427            self.dut_utils.start_services()
428
429        samples = self.power_monitor.get_waveform(file_path=data_path)
430
431        current = [sample[1] for sample in samples]
432        average_current = sum(current) * 1000 / len(current)
433        self.log.info('Average current computed: {}'.format(average_current))
434        plot_title = '{}_{}'.format(self.test_name, measurement_tag)
435        power_plot_utils.current_waveform_plot(
436            samples, self.MonsoonParams['voltage'],
437            context.get_current_context().get_full_output_path(), plot_title)
438
439        return average_current
440
441    def print_throughput_result(self, result):
442        # Print Test Summary
443        if 'nr_tput_result' in result:
444
445            self.log.info(
446                "NR5G DL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
447                .format(
448                    result['nr_tput_result']['total']['DL']['min_tput'],
449                    result['nr_tput_result']['total']['DL']['average_tput'],
450                    result['nr_tput_result']['total']['DL']['max_tput'],
451                    result['nr_tput_result']['total']['DL']
452                    ['theoretical_tput'],
453                    result['nr_bler_result']['total']['DL']['nack_ratio'] *
454                    100))
455            self.log.info(
456                "NR5G UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
457                .format(
458                    result['nr_tput_result']['total']['UL']['min_tput'],
459                    result['nr_tput_result']['total']['UL']['average_tput'],
460                    result['nr_tput_result']['total']['UL']['max_tput'],
461                    result['nr_tput_result']['total']['UL']
462                    ['theoretical_tput'],
463                    result['nr_bler_result']['total']['UL']['nack_ratio'] *
464                    100))
465        if 'lte_tput_result' in result:
466            self.log.info(
467                "LTE DL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
468                .format(
469                    result['lte_tput_result']['total']['DL']['min_tput'],
470                    result['lte_tput_result']['total']['DL']['average_tput'],
471                    result['lte_tput_result']['total']['DL']['max_tput'],
472                    result['lte_tput_result']['total']['DL']
473                    ['theoretical_tput'],
474                    result['lte_bler_result']['total']['DL']['nack_ratio'] *
475                    100))
476            if self.testclass_params['lte_ul_mac_padding']:
477                self.log.info(
478                    "LTE UL PHY Tput (Mbps) (Min/Avg/Max/Th): {:.2f} / {:.2f} / {:.2f} / {:.2f}\tBLER: {:.2f}"
479                    .format(
480                        result['lte_tput_result']['total']['UL']['min_tput'],
481                        result['lte_tput_result']['total']['UL']
482                        ['average_tput'],
483                        result['lte_tput_result']['total']['UL']['max_tput'],
484                        result['lte_tput_result']['total']['UL']
485                        ['theoretical_tput'],
486                        result['lte_bler_result']['total']['UL']['nack_ratio']
487                        * 100))
488            if self.testclass_params['traffic_type'] != 'PHY':
489                self.log.info("{} Tput: {:.2f} Mbps".format(
490                    self.testclass_params['traffic_type'],
491                    result['iperf_throughput']))
492
493    def setup_tester(self, testcase_params):
494        # Configure all cells
495        self.keysight_test_app.toggle_contiguous_nr_channels(0)
496        for cell_idx, cell in enumerate(
497                testcase_params['endc_combo_config']['cell_list']):
498            if cell['cell_type'] == 'NR5G':
499                self.keysight_test_app.set_nr_cell_type(
500                    cell['cell_type'], cell['cell_number'],
501                    cell['nr_cell_type'])
502            self.keysight_test_app.set_cell_duplex_mode(
503                cell['cell_type'], cell['cell_number'], cell['duplex_mode'])
504            self.keysight_test_app.set_cell_band(cell['cell_type'],
505                                                 cell['cell_number'],
506                                                 cell['band'])
507            self.keysight_test_app.set_cell_dl_power(
508                cell['cell_type'], cell['cell_number'],
509                testcase_params['cell_power_sweep'][cell_idx][0], 1)
510            self.keysight_test_app.set_cell_input_power(
511                cell['cell_type'], cell['cell_number'],
512                self.testclass_params['input_power'][cell['cell_type']])
513            if cell['cell_type'] == 'LTE' and cell['pcc'] == 0:
514                pass
515            else:
516                self.keysight_test_app.set_cell_ul_power_control(
517                    cell['cell_type'], cell['cell_number'],
518                    self.testclass_params['ul_power_control_mode'],
519                    self.testclass_params.get('ul_power_control_target', 0))
520            if cell['cell_type'] == 'NR5G':
521                self.keysight_test_app.set_nr_subcarrier_spacing(
522                    cell['cell_number'], cell['subcarrier_spacing'])
523            if 'channel' in cell and cell['channel'] is not None:
524                self.keysight_test_app.set_cell_channel(
525                    cell['cell_type'], cell['cell_number'], cell['channel'])
526            self.keysight_test_app.set_cell_bandwidth(cell['cell_type'],
527                                                      cell['cell_number'],
528                                                      cell['dl_bandwidth'])
529            self.keysight_test_app.set_cell_mimo_config(
530                cell['cell_type'], cell['cell_number'], 'DL',
531                cell['dl_mimo_config'])
532            if cell['cell_type'] == 'LTE':
533                self.keysight_test_app.set_lte_cell_transmission_mode(
534                    cell['cell_number'], cell['transmission_mode'])
535                self.keysight_test_app.set_lte_cell_num_codewords(
536                    cell['cell_number'], cell['num_codewords'])
537                self.keysight_test_app.set_lte_cell_num_layers(
538                    cell['cell_number'], cell['num_layers'])
539                self.keysight_test_app.set_lte_cell_dl_subframe_allocation(
540                    cell['cell_number'], cell['dl_subframe_allocation'])
541                self.keysight_test_app.set_lte_control_region_size(
542                    cell['cell_number'], 1)
543            if cell['ul_enabled'] and cell['cell_type'] == 'NR5G':
544                self.keysight_test_app.set_cell_mimo_config(
545                    cell['cell_type'], cell['cell_number'], 'UL',
546                    cell['ul_mimo_config'])
547            if 'fading_scenario' in self.testclass_params:
548                self.keysight_test_app.configure_channel_emulator(
549                    cell['cell_type'], cell['cell_number'],
550                    self.testclass_params['fading_scenario'][
551                        cell['cell_type']])
552
553        if testcase_params.get('force_contiguous_nr_channel', False):
554            self.keysight_test_app.toggle_contiguous_nr_channels(1)
555
556        if testcase_params['endc_combo_config']['lte_cell_count']:
557            self.keysight_test_app.set_lte_cell_mcs(
558                'CELL1', testcase_params['lte_dl_mcs_table'],
559                testcase_params['lte_dl_mcs'],
560                testcase_params['lte_ul_mcs_table'],
561                testcase_params['lte_ul_mcs'])
562            self.keysight_test_app.set_lte_ul_mac_padding(
563                self.testclass_params['lte_ul_mac_padding'])
564
565        if testcase_params['endc_combo_config']['nr_cell_count']:
566            if 'schedule_scenario' in testcase_params:
567                self.keysight_test_app.set_nr_cell_schedule_scenario(
568                    'CELL1', testcase_params['schedule_scenario'])
569                if testcase_params['schedule_scenario'] == 'FULL_TPUT':
570                    self.keysight_test_app.set_nr_schedule_slot_ratio(
571                        'CELL1', testcase_params['schedule_slot_ratio'])
572                    self.keysight_test_app.set_nr_schedule_tdd_pattern(
573                        'CELL1', testcase_params.get('tdd_pattern', 0))
574            self.keysight_test_app.set_nr_ul_dft_precoding(
575                'CELL1', testcase_params['transform_precoding'])
576            self.keysight_test_app.set_nr_cell_mcs(
577                'CELL1', testcase_params['nr_dl_mcs'],
578                testcase_params['nr_ul_mcs'])
579            self.keysight_test_app.set_dl_carriers(
580                testcase_params['endc_combo_config']['nr_dl_carriers'])
581            self.keysight_test_app.set_ul_carriers(
582                testcase_params['endc_combo_config']['nr_ul_carriers'])
583
584        if testcase_params['endc_combo_config']['lte_cell_count']:
585            # Connect flow for LTE and LTE+FR1 ENDC
586            # Turn on LTE cells
587            for cell in testcase_params['endc_combo_config']['cell_list']:
588                if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state(
589                        cell['cell_type'], cell['cell_number']):
590                    self.log.info('Turning LTE Cell {} on.'.format(
591                        cell['cell_number']))
592                    self.keysight_test_app.set_cell_state(
593                        cell['cell_type'], cell['cell_number'], 1)
594            self.log.info('Waiting for LTE connections')
595            # Turn airplane mode off
596            num_apm_toggles = 10
597            for idx in range(num_apm_toggles):
598                self.log.info('Turning off airplane mode')
599                self.dut_utils.toggle_airplane_mode(False, False, idx)
600                if self.keysight_test_app.wait_for_cell_status(
601                        'LTE', 'CELL1', 'CONN', 10 * (idx + 1)):
602                    self.log.info('Connected! Waiting for {} seconds.'.format(
603                        LONG_SLEEP))
604                    time.sleep(LONG_SLEEP)
605                    break
606                elif idx < num_apm_toggles - 1:
607                    self.log.info('Turning on airplane mode')
608                    self.dut_utils.toggle_airplane_mode(True, False, idx)
609                    time.sleep(MEDIUM_SLEEP)
610                else:
611                    asserts.fail('DUT did not connect to LTE.')
612            # Activate LTE aggregation if applicable
613            if testcase_params['endc_combo_config']['lte_scc_list']:
614                self.keysight_test_app.apply_lte_carrier_agg(
615                    testcase_params['endc_combo_config']['lte_scc_list'])
616
617            if testcase_params['endc_combo_config']['nr_cell_count']:
618                self.keysight_test_app.apply_carrier_agg()
619                self.log.info('Waiting for 5G connection')
620                connected = self.keysight_test_app.wait_for_cell_status(
621                    'NR5G',
622                    testcase_params['endc_combo_config']['nr_cell_count'],
623                    ['ACT', 'CONN'], 60)
624                if not connected:
625                    asserts.fail('DUT did not connect to NR.')
626            time.sleep(SHORT_SLEEP)
627        elif testcase_params['endc_combo_config']['nr_cell_count']:
628            # Connect flow for NR FR1 Standalone
629            # Turn on NR cells
630            for cell in testcase_params['endc_combo_config']['cell_list']:
631                if cell['cell_type'] == 'NR5G' and not self.keysight_test_app.get_cell_state(
632                        cell['cell_type'], cell['cell_number']):
633                    self.log.info('Turning NR Cell {} on.'.format(
634                        cell['cell_number']))
635                    self.keysight_test_app.set_cell_state(
636                        cell['cell_type'], cell['cell_number'], 1)
637            num_apm_toggles = 10
638            for idx in range(num_apm_toggles):
639                self.log.info('Turning off airplane mode now.')
640                self.dut_utils.toggle_airplane_mode(False, False, idx)
641                if self.keysight_test_app.wait_for_cell_status(
642                        'NR5G', 'CELL1', 'CONN', 10 * (idx + 1)):
643                    self.log.info('Connected! Waiting for {} seconds.'.format(
644                        LONG_SLEEP))
645                    time.sleep(LONG_SLEEP)
646                    break
647                elif idx < num_apm_toggles - 1:
648                    self.log.info('Turning on airplane mode now.')
649                    self.dut_utils.toggle_airplane_mode(True, False, idx)
650                    time.sleep(MEDIUM_SLEEP)
651                else:
652                    asserts.fail('DUT did not connect to NR.')
653
654        if 'fading_scenario' in self.testclass_params and self.testclass_params[
655                'fading_scenario']['enable']:
656            self.log.info('Enabling fading.')
657            self.keysight_test_app.set_channel_emulator_state(
658                self.testclass_params['fading_scenario']['enable'])
659        else:
660            self.keysight_test_app.set_channel_emulator_state(0)
661
662    def _test_throughput_bler(self, testcase_params):
663        """Test function to run cellular throughput and BLER measurements.
664
665        The function runs BLER/throughput measurement after configuring the
666        callbox and DUT. The test supports running PHY or TCP/UDP layer traffic
667        in a variety of band/carrier/mcs/etc configurations.
668
669        Args:
670            testcase_params: dict containing test-specific parameters
671        Returns:
672            result: dict containing throughput results and meta data
673        """
674        # Prepare results dicts
675        testcase_params = self.compile_test_params(testcase_params)
676        testcase_results = collections.OrderedDict()
677        testcase_results['testcase_params'] = testcase_params
678        testcase_results['results'] = []
679
680        # Setup ota chamber if needed
681        if hasattr(self,
682                   'keysight_chamber') and 'orientation' in testcase_params:
683            self.keysight_chamber.move_theta_phi_abs(
684                self.keysight_chamber.preset_orientations[
685                    testcase_params['orientation']]['theta'],
686                self.keysight_chamber.preset_orientations[
687                    testcase_params['orientation']]['phi'])
688
689        # Setup tester and wait for DUT to connect
690        self.setup_tester(testcase_params)
691
692        # Run throughput test loop
693        stop_counter = 0
694        if testcase_params['endc_combo_config']['nr_cell_count']:
695            self.keysight_test_app.select_display_tab('NR5G', 1, 'BTHR',
696                                                      'OTAGRAPH')
697        else:
698            self.keysight_test_app.select_display_tab('LTE', 1, 'BTHR',
699                                                      'OTAGRAPH')
700        for power_idx in range(len(testcase_params['cell_power_sweep'][0])):
701            result = collections.OrderedDict()
702            # Check that cells are still connected
703            connected = 1
704            for cell in testcase_params['endc_combo_config']['cell_list']:
705                if not self.keysight_test_app.wait_for_cell_status(
706                        cell['cell_type'], cell['cell_number'],
707                    ['ACT', 'CONN'], VERY_SHORT_SLEEP, VERY_SHORT_SLEEP):
708                    connected = 0
709            if not connected:
710                self.log.info('DUT lost connection to cells. Ending test.')
711                break
712            # Set DL cell power
713            for cell_idx, cell in enumerate(
714                    testcase_params['endc_combo_config']['cell_list']):
715                cell_power_array = []
716                current_cell_power = testcase_params['cell_power_sweep'][
717                    cell_idx][power_idx]
718                cell_power_array.append(current_cell_power)
719                self.keysight_test_app.set_cell_dl_power(
720                    cell['cell_type'], cell['cell_number'], current_cell_power,
721                    1)
722            result['cell_power'] = cell_power_array
723            # Start BLER and throughput measurements
724            self.log.info('Cell powers: {}'.format(cell_power_array))
725            self.start_single_throughput_measurement(testcase_params)
726            if self.power_monitor:
727                measurement_wait = LONG_SLEEP if (power_idx == 0) else 0
728                current_average_current = self.collect_power_data(
729                    self.testclass_params['traffic_duration'],
730                    measurement_wait,
731                    reconnect_usb=0,
732                    measurement_tag=power_idx)
733            current_throughput = self.stop_single_throughput_measurement(
734                testcase_params)
735            lte_rx_meas = self.dut_utils.get_rx_measurements('LTE')
736            nr_rx_meas = self.dut_utils.get_rx_measurements('NR5G')
737            result['throughput_measurements'] = current_throughput
738            self.print_throughput_result(current_throughput)
739
740            if self.testclass_params.get('log_rsrp_metrics', 1):
741                result['lte_rx_measurements'] = lte_rx_meas
742                result['nr_rx_measurements'] = nr_rx_meas
743                self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
744                self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
745
746            testcase_results['results'].append(result)
747            if (('lte_bler_result' in result['throughput_measurements']
748                 and result['throughput_measurements']['lte_bler_result']
749                 ['total']['DL']['nack_ratio'] * 100 > 99)
750                    or ('nr_bler_result' in result['throughput_measurements']
751                        and result['throughput_measurements']['nr_bler_result']
752                        ['total']['DL']['nack_ratio'] * 100 > 99)):
753                stop_counter = stop_counter + 1
754            else:
755                stop_counter = 0
756            if stop_counter == STOP_COUNTER_LIMIT:
757                break
758
759        # Save results
760        self.testclass_results[self.current_test_name] = testcase_results
761
762    def test_measure_power(self):
763        self.log.info('Turing screen off')
764        self.dut_utils.set_screen_state(0)
765        time.sleep(10)
766        self.log.info('Measuring power now.')
767        self.collect_power_data(60, 0)
768