1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16import random 17import logging 18from acts.base_test import BaseTestClass 19from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference 20from acts_contrib.test_utils.bt.BtInterferenceBaseTest import unpack_custom_file 21from acts_contrib.test_utils.power.PowerBaseTest import ObjNew 22from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils 23from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 24import time 25 26MAX_ATTENUATION = 95 27INIT_ATTEN = 0 28SCAN = 'wpa_cli scan' 29SCAN_RESULTS = 'wpa_cli scan_results' 30 31 32class InjectWifiInterferenceTest(BaseTestClass): 33 def __init__(self, configs): 34 super().__init__(configs) 35 req_params = ['custom_files', 'wifi_networks'] 36 self.unpack_userparams(req_params) 37 for file in self.custom_files: 38 if 'static_interference' in file: 39 self.static_wifi_interference = unpack_custom_file(file) 40 elif 'dynamic_interference' in file: 41 self.dynamic_wifi_interference = unpack_custom_file(file) 42 43 def setup_class(self): 44 45 self.dut = self.android_devices[0] 46 # Set attenuator to minimum attenuation 47 if hasattr(self, 'attenuators'): 48 self.attenuator = self.attenuators[0] 49 self.attenuator.set_atten(INIT_ATTEN) 50 self.wifi_int_pairs = [] 51 for i in range(len(self.attenuators) - 1): 52 tmp_dict = { 53 'attenuator': self.attenuators[i + 1], 54 'network': self.wifi_networks[i], 55 'channel': self.wifi_networks[i]['channel'] 56 } 57 tmp_obj = ObjNew(**tmp_dict) 58 self.wifi_int_pairs.append(tmp_obj) 59 ##Setup connection between WiFi APs and Phones and get DHCP address 60 # for the interface 61 for obj in self.wifi_int_pairs: 62 obj.attenuator.set_atten(INIT_ATTEN) 63 64 def setup_test(self): 65 self.log.info("Setup test initiated") 66 67 def teardown_class(self): 68 for obj in self.wifi_int_pairs: 69 obj.attenuator.set_atten(MAX_ATTENUATION) 70 71 def teardown_test(self): 72 for obj in self.wifi_int_pairs: 73 obj.attenuator.set_atten(MAX_ATTENUATION) 74 75 def test_inject_static_wifi_interference(self): 76 condition = True 77 while condition: 78 attenuation = [ 79 int(x) for x in input( 80 "Please enter 4 channel attenuation value followed by comma :\n" 81 ).split(',') 82 ] 83 self.set_atten_all_channel(attenuation) 84 # Read interference RSSI 85 self.interference_rssi = get_interference_rssi( 86 self.dut, self.wifi_int_pairs) 87 self.log.info('Under the WiFi interference condition: ' 88 'channel 1 RSSI: {} dBm, ' 89 'channel 6 RSSI: {} dBm' 90 'channel 11 RSSI: {} dBm'.format( 91 self.interference_rssi[0]['rssi'], 92 self.interference_rssi[1]['rssi'], 93 self.interference_rssi[2]['rssi'])) 94 condition = True 95 return True 96 97 def test_inject_dynamic_interface(self): 98 atten = int(input("Please enter the attenuation level for CHAN1 :")) 99 self.attenuator.set_atten(atten) 100 self.log.info("Attenuation for CHAN1 set to:{} dB".format(atten)) 101 interference_rssi = None 102 self.channel_change_interval = self.dynamic_wifi_interference[ 103 'channel_change_interval_second'] 104 self.wifi_int_levels = list( 105 self.dynamic_wifi_interference['interference_level'].keys()) 106 for wifi_level in self.wifi_int_levels: 107 interference_atten_level = self.dynamic_wifi_interference[ 108 'interference_level'][wifi_level] 109 all_pair = range(len(self.wifi_int_pairs)) 110 # Set initial WiFi interference at channel 1 111 logging.info('Start with interference at channel 1') 112 self.wifi_int_pairs[0].attenuator.set_atten( 113 interference_atten_level) 114 self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION) 115 self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION) 116 current_int_pair = [0] 117 inactive_int_pairs = [ 118 item for item in all_pair if item not in current_int_pair 119 ] 120 logging.info( 121 'Inject random changing channel (1,6,11) wifi interference' 122 'every {} second'.format(self.channel_change_interval)) 123 while True: 124 current_int_pair = [ 125 random.randint(inactive_int_pairs[0], 126 inactive_int_pairs[1]) 127 ] 128 inactive_int_pairs = [ 129 item for item in all_pair if item not in current_int_pair 130 ] 131 self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten( 132 interference_atten_level) 133 logging.info('Current interference at channel {}'.format( 134 self.wifi_int_pairs[current_int_pair[0]].channel)) 135 for i in inactive_int_pairs: 136 self.wifi_int_pairs[i].attenuator.set_atten( 137 MAX_ATTENUATION) 138 # Read interference RSSI 139 self.interference_rssi = get_interference_rssi( 140 self.dut, self.wifi_int_pairs) 141 self.log.info('Under the WiFi interference condition: ' 142 'channel 1 RSSI: {} dBm, ' 143 'channel 6 RSSI: {} dBm' 144 'channel 11 RSSI: {} dBm'.format( 145 self.interference_rssi[0]['rssi'], 146 self.interference_rssi[1]['rssi'], 147 self.interference_rssi[2]['rssi'])) 148 time.sleep(self.channel_change_interval) 149 return True 150 151 def set_atten_all_channel(self, attenuation): 152 self.attenuators[0].set_atten(attenuation[0]) 153 self.attenuators[1].set_atten(attenuation[1]) 154 self.attenuators[2].set_atten(attenuation[2]) 155 self.attenuators[3].set_atten(attenuation[3]) 156 self.log.info( 157 "Attenuation set to CHAN1:{},CHAN2:{},CHAN3:{},CHAN4:{}".format( 158 self.attenuators[0].get_atten(), 159 self.attenuators[1].get_atten(), 160 self.attenuators[2].get_atten(), 161 self.attenuators[3].get_atten())) 162 163 164def get_interference_rssi(dut, wifi_int_pairs): 165 """Function to read wifi interference RSSI level.""" 166 167 bssids = [] 168 interference_rssi = [] 169 wutils.wifi_toggle_state(dut, True) 170 for item in wifi_int_pairs: 171 ssid = item.network['SSID'] 172 bssid = item.network['bssid'] 173 bssids.append(bssid) 174 interference_rssi_dict = { 175 "ssid": ssid, 176 "bssid": bssid, 177 "chan": item.channel, 178 "rssi": 0 179 } 180 interference_rssi.append(interference_rssi_dict) 181 scaned_rssi = wpeutils.get_scan_rssi(dut, bssids, num_measurements=2) 182 for item in interference_rssi: 183 item['rssi'] = scaned_rssi[item['bssid']]['mean'] 184 logging.info('Interference RSSI at channel {} is {} dBm'.format( 185 item['chan'], item['rssi'])) 186 wutils.wifi_toggle_state(dut, False) 187 return interference_rssi 188