1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Stream music through connected device from phone across different 17attenuations.""" 18 19from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest 20from acts.metrics.loggers.blackbox import BlackboxMetricLogger 21from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results 22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference 23from multiprocessing import Process, Queue 24from acts.signals import TestPass 25 26DEFAULT_THDN_THRESHOLD = 0.9 27MAX_ATTENUATION = 95 28TIME_OVERHEAD = 2 29 30 31class BtInterferenceStaticTest(BtInterferenceBaseTest): 32 33 def __init__(self, configs): 34 super().__init__(configs) 35 self.bt_attenuation_range = range(self.attenuation_vector['start'], 36 self.attenuation_vector['stop'] + 1, 37 self.attenuation_vector['step']) 38 self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD 39 test_metrics = [ 40 'wifi_chan1_rssi', 'wifi_chan6_rssi', 'wifi_chan11_rssi', 'bt_range' 41 ] 42 for metric in test_metrics: 43 setattr(self, '{}_metric'.format(metric), 44 BlackboxMetricLogger.for_test_case(metric_name=metric)) 45 46 def setup_generated_tests(self): 47 for level in list( 48 self.static_wifi_interference['interference_level'].values()): 49 for channels in self.static_wifi_interference['channels']: 50 arg_set = [(level, channels)] 51 self.generate_tests( 52 test_logic=self.bt_range_with_static_wifi_interference, 53 name_func=self.create_test_name, 54 arg_sets=arg_set) 55 56 def create_test_name(self, level, channels): 57 str_channel_test = '' 58 for i in channels: 59 str_channel_test = str_channel_test + str(i) + "_" 60 test_case_name = ('test_bt_range_with_static_interference_level_{}_' 61 'channel_{}'.format(level, str_channel_test)) 62 return test_case_name 63 64 def bt_range_with_static_wifi_interference(self, level, channels): 65 """Test function to measure bt range under interference. 66 67 Args: 68 interference_level: wifi interference level 69 channels: wifi interference channels 70 """ 71 #setup wifi interference by setting the correct attenuator 72 inject_static_wifi_interference(self.wifi_int_pairs, level, channels) 73 # Read interference RSSI 74 self.get_interference_rssi() 75 self.wifi_chan1_rssi_metric.metric_value = self.interference_rssi[0][ 76 'rssi'] 77 self.wifi_chan6_rssi_metric.metric_value = self.interference_rssi[1][ 78 'rssi'] 79 self.wifi_chan11_rssi_metric.metric_value = self.interference_rssi[2][ 80 'rssi'] 81 for atten in self.bt_attenuation_range: 82 # Set attenuation for BT link 83 self.attenuator.set_atten(atten) 84 [ 85 rssi_master, pwl_master, rssi_c0_master, rssi_c1_master, 86 txpw_c0_master, txpw_c1_master, bftx_master, divtx_master 87 ], [rssi_slave] = self._get_bt_link_metrics() 88 rssi_primary = rssi_master.get(self.dut.serial, -127) 89 pwl_primary = pwl_master.get(self.dut.serial, -127) 90 rssi_secondary = rssi_slave.get(self.bt_device_controller.serial, 91 -127) 92 tag = 'attenuation_{}dB'.format(atten) 93 self.log.info( 94 'BT attenuation set to {} dB and start A2DP streaming'.format( 95 atten)) 96 procs_iperf = [] 97 for obj in self.wifi_int_pairs: 98 obj.iperf_server.start() 99 self.log.info('Started IPERF server at port {}'.format( 100 obj.iperf_server.port)) 101 iperf_args = '-i 1 -t {} -p {} -J -R'.format( 102 self.iperf_duration, obj.iperf_server.port) 103 tag = 'chan_{}'.format(obj.channel) 104 proc_iperf = Process(target=obj.iperf_client.start, 105 args=(obj.server_address, iperf_args, tag)) 106 procs_iperf.append(proc_iperf) 107 108 #play a2dp streaming and run thdn analysis 109 queue = Queue() 110 proc_bt = Process(target=self.play_and_record_audio, 111 args=(self.audio_params['duration'], queue)) 112 for proc in procs_iperf: 113 proc.start() 114 proc_bt.start() 115 proc_bt.join() 116 for proc in procs_iperf: 117 proc.join() 118 for obj in self.wifi_int_pairs: 119 iperf_throughput = get_iperf_results(obj.iperf_server) 120 self.log.info( 121 'Throughput for channel {} interference is {} Mbps'.format( 122 obj.channel, iperf_throughput)) 123 obj.iperf_server.stop() 124 self.log.info('Stopped IPERF server at port {}'.format( 125 obj.iperf_server.port)) 126 audio_captured = queue.get() 127 thdns = self.run_thdn_analysis(audio_captured, tag) 128 self.log.info('THDN results are {} at {} dB attenuation'.format( 129 thdns, atten)) 130 self.log.info('DUT rssi {} dBm, master tx power level {}, ' 131 'RemoteDevice rssi {} dBm'.format( 132 rssi_primary, pwl_primary, rssi_secondary)) 133 for thdn in thdns: 134 if thdn >= self.audio_params['thdn_threshold']: 135 self.log.info('Under the WiFi interference condition: ' 136 'channel 1 RSSI: {} dBm, ' 137 'channel 6 RSSI: {} dBm' 138 'channel 11 RSSI: {} dBm'.format( 139 self.interference_rssi[0]['rssi'], 140 self.interference_rssi[1]['rssi'], 141 self.interference_rssi[2]['rssi'])) 142 raise TestPass( 143 'Max range for this test is {}, with BT master RSSI at' 144 ' {} dBm'.format(atten, rssi_primary)) 145 146