1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18 19from acts import logger 20from acts import signals 21from acts import utils 22 23TIME_TO_SLEEP_BETWEEN_RETRIES = 1 24TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 25 26 27class WlanControllerError(signals.ControllerError): 28 pass 29 30 31class WlanController: 32 """Contains methods related to wlan core, to be used in FuchsiaDevice object""" 33 34 def __init__(self, fuchsia_device): 35 self.device = fuchsia_device 36 self.log = logger.create_tagged_trace_logger( 37 'WlanController for FuchsiaDevice | %s' % self.device.ip) 38 39 # TODO(70501): Wrap wlan_lib functions and setup from FuchsiaDevice here 40 # (similar to how WlanPolicyController does it) to prevent FuchsiaDevice 41 # from growing too large. 42 def _configure_wlan(self): 43 pass 44 45 def _deconfigure_wlan(self): 46 pass 47 48 def update_wlan_interfaces(self): 49 """ Retrieves WLAN interfaces from device and sets the FuchsiaDevice 50 attributes. 51 """ 52 wlan_interfaces = self.get_interfaces_by_role() 53 self.device.wlan_client_interfaces = wlan_interfaces['client'] 54 self.device.wlan_ap_interfaces = wlan_interfaces['ap'] 55 56 # Set test interfaces to value from config, else the first found 57 # interface, else None 58 self.device.wlan_client_test_interface_name = self.device.conf_data.get( 59 'wlan_client_test_interface', 60 next(iter(self.device.wlan_client_interfaces), None)) 61 62 self.device.wlan_ap_test_interface_name = self.device.conf_data.get( 63 'wlan_ap_test_interface', 64 next(iter(self.device.wlan_ap_interfaces), None)) 65 66 def get_interfaces_by_role(self): 67 """ Retrieves WLAN interface information, supplimented by netstack info. 68 69 Returns: 70 Dict with keys 'client' and 'ap', each of which contain WLAN 71 interfaces. 72 """ 73 74 # Retrieve WLAN interface IDs 75 response = self.device.sl4f.wlan_lib.wlanGetIfaceIdList() 76 if response.get('error'): 77 raise WlanControllerError('Failed to get WLAN iface ids: %s' % 78 response['error']) 79 80 wlan_iface_ids = response.get('result', []) 81 if len(wlan_iface_ids) < 1: 82 return {'client': {}, 'ap': {}} 83 84 # Use IDs to get WLAN interface info and mac addresses 85 wlan_ifaces_by_mac = {} 86 for id in wlan_iface_ids: 87 response = self.device.sl4f.wlan_lib.wlanQueryInterface(id) 88 if response.get('error'): 89 raise WlanControllerError( 90 'Failed to query wlan iface id %s: %s' % 91 (id, response['error'])) 92 93 mac = response['result'].get('sta_addr', None) 94 if mac is None: 95 # Fallback to older field name to maintain backwards 96 # compatibility with older versions of SL4F's 97 # QueryIfaceResponse. See https://fxrev.dev/562146. 98 mac = response['result'].get('mac_addr') 99 100 wlan_ifaces_by_mac[utils.mac_address_list_to_str( 101 mac)] = response['result'] 102 103 # Use mac addresses to query the interfaces from the netstack view, 104 # which allows us to supplement the interface information with the name, 105 # netstack_id, etc. 106 107 # TODO(fxb/75909): This tedium is necessary to get the interface name 108 # because only netstack has that information. The bug linked here is 109 # to reconcile some of the information between the two perspectives, at 110 # which point we can eliminate step. 111 net_ifaces = self.device.netstack_controller.list_interfaces() 112 wlan_ifaces_by_role = {'client': {}, 'ap': {}} 113 for iface in net_ifaces: 114 try: 115 # Some interfaces might not have a MAC 116 iface_mac = utils.mac_address_list_to_str(iface['mac']) 117 except Exception as e: 118 self.log.debug(f'Error {e} getting MAC for iface {iface}') 119 continue 120 if iface_mac in wlan_ifaces_by_mac: 121 wlan_ifaces_by_mac[iface_mac]['netstack_id'] = iface['id'] 122 123 # Add to return dict, mapped by role then name. 124 wlan_ifaces_by_role[ 125 wlan_ifaces_by_mac[iface_mac]['role'].lower()][ 126 iface['name']] = wlan_ifaces_by_mac[iface_mac] 127 128 return wlan_ifaces_by_role 129 130 def set_country_code(self, country_code): 131 """Sets country code through the regulatory region service and waits 132 for the code to be applied to WLAN PHY. 133 134 Args: 135 country_code: string, the 2 character country code to set 136 137 Raises: 138 EnvironmentError - failure to get/set regulatory region 139 ConnectionError - failure to query PHYs 140 """ 141 self.log.info('Setting DUT country code to %s' % country_code) 142 country_code_response = self.device.sl4f.regulatory_region_lib.setRegion( 143 country_code) 144 if country_code_response.get('error'): 145 raise EnvironmentError( 146 'Failed to set country code (%s) on DUT. Error: %s' % 147 (country_code, country_code_response['error'])) 148 149 self.log.info('Verifying DUT country code was correctly set to %s.' % 150 country_code) 151 phy_ids_response = self.device.sl4f.wlan_lib.wlanPhyIdList() 152 if phy_ids_response.get('error'): 153 raise ConnectionError('Failed to get phy ids from DUT. Error: %s' % 154 (country_code, phy_ids_response['error'])) 155 156 end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE 157 while time.time() < end_time: 158 for id in phy_ids_response['result']: 159 get_country_response = self.device.sl4f.wlan_lib.wlanGetCountry( 160 id) 161 if get_country_response.get('error'): 162 raise ConnectionError( 163 'Failed to query PHY ID (%s) for country. Error: %s' % 164 (id, get_country_response['error'])) 165 166 set_code = ''.join([ 167 chr(ascii_char) 168 for ascii_char in get_country_response['result'] 169 ]) 170 if set_code != country_code: 171 self.log.debug( 172 'PHY (id: %s) has incorrect country code set. ' 173 'Expected: %s, Got: %s' % (id, country_code, set_code)) 174 break 175 else: 176 self.log.info('All PHYs have expected country code (%s)' % 177 country_code) 178 break 179 time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) 180 else: 181 raise EnvironmentError('Failed to set DUT country code to %s.' % 182 country_code) 183