1#!/usr/bin/env python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import importlib 19import ipaddress 20import logging 21import numpy 22import re 23import time 24from acts import asserts 25from acts import utils 26from acts.controllers.android_device import AndroidDevice 27from acts.controllers.utils_lib import ssh 28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils 30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils 31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils 32 33from concurrent.futures import ThreadPoolExecutor 34 35SHORT_SLEEP = 1 36MED_SLEEP = 6 37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)] 38BAND_TO_CHANNEL_MAP = { 39 '2.4GHz': list(range(1, 14)), 40 'UNII-1': [36, 40, 44, 48], 41 'UNII-2': 42 [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140], 43 'UNII-3': [149, 153, 157, 161, 165], 44 '6GHz': CHANNELS_6GHz 45} 46CHANNEL_TO_BAND_MAP = { 47 channel: band 48 for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels 49} 50 51 52# Decorators 53def nonblocking(f): 54 """Creates a decorator transforming function calls to non-blocking""" 55 56 def wrap(*args, **kwargs): 57 executor = ThreadPoolExecutor(max_workers=1) 58 thread_future = executor.submit(f, *args, **kwargs) 59 # Ensure resources are freed up when executor ruturns or raises 60 executor.shutdown(wait=False) 61 return thread_future 62 63 return wrap 64 65 66def detect_wifi_platform(dut): 67 if hasattr(dut, 'wifi_platform'): 68 return dut.wifi_platform 69 qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/')) 70 if qcom_check: 71 dut.wifi_platform = 'qcom' 72 else: 73 dut.wifi_platform = 'brcm' 74 return dut.wifi_platform 75 76 77def detect_wifi_decorator(f): 78 79 def wrap(*args, **kwargs): 80 if 'dut' in kwargs: 81 dut = kwargs['dut'] 82 else: 83 dut = next(arg for arg in args if type(arg) == AndroidDevice) 84 dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format( 85 detect_wifi_platform(dut)) 86 dut_package = importlib.import_module(dut_package) 87 f_decorated = getattr(dut_package, f.__name__, lambda: None) 88 return (f_decorated(*args, **kwargs)) 89 90 return wrap 91 92 93# JSON serializer 94def serialize_dict(input_dict): 95 """Function to serialize dicts to enable JSON output""" 96 output_dict = collections.OrderedDict() 97 for key, value in input_dict.items(): 98 output_dict[_serialize_value(key)] = _serialize_value(value) 99 return output_dict 100 101 102def _serialize_value(value): 103 """Function to recursively serialize dict entries to enable JSON output""" 104 if isinstance(value, tuple): 105 return str(value) 106 if isinstance(value, numpy.int64): 107 return int(value) 108 if isinstance(value, numpy.float64): 109 return float(value) 110 if isinstance(value, list): 111 return [_serialize_value(x) for x in value] 112 if isinstance(value, numpy.ndarray): 113 return [_serialize_value(x) for x in value] 114 elif isinstance(value, dict): 115 return serialize_dict(value) 116 elif type(value) in (float, int, bool, str): 117 return value 118 else: 119 return "Non-serializable object" 120 121 122def extract_sub_dict(full_dict, fields): 123 sub_dict = collections.OrderedDict( 124 (field, full_dict[field]) for field in fields) 125 return sub_dict 126 127def write_antenna_tune_code(dut, tune_code): 128 flag_tune_code_forcing_on = 0 129 # Use AT Command file to enable tune code forcing 130 for n_enable in range(5): 131 logging.debug('{}-th Enabling modem test mode'.format(n_enable)) 132 try: 133 at_lmodetest_output = dut.adb.shell("modem_cmd raw AT+LMODETEST") 134 time.sleep(SHORT_SLEEP) 135 logging.debug('Command AT+LMODETEST output: {}'.format(at_lmodetest_output)) 136 except: 137 logging.error('{}-th Failed modem test mode AT+LMODETEST'.format(n_enable)) 138 continue 139 try: 140 at_lrffinalstart_output = dut.adb.shell("modem_cmd raw AT+LRFFINALSTART") 141 time.sleep(SHORT_SLEEP) 142 logging.debug('Command AT+LRFFINALSTART output: {}'.format(at_lrffinalstart_output)) 143 if "+LRFFINALSTART:0" in at_lrffinalstart_output and not "ERROR" in at_lrffinalstart_output: 144 flag_tune_code_forcing_on = 1 145 logging.info('Enable modem test mode SUCCESSFUL') 146 break 147 except: 148 logging.error('{}-th Failed modem test mode AT+LRFFINALSTART'.format(n_enable)) 149 continue 150 if not flag_tune_code_forcing_on: 151 raise RuntimeError("AT Command File Not set up") 152 at_cmd_output = dut.adb.shell("modem_cmd raw " + tune_code['tune_code_cmd']) 153 logging.debug('Write Tune Code: {}'.format("modem_cmd raw " + tune_code['tune_code_cmd'])) 154 flag_tc_reg_correct = True 155 # Check tune code register values 156 for tune_code_register_key in tune_code['tune_code_registers'].keys(): 157 try: 158 at_tc_reg_output = dut.adb.shell("modem_cmd raw AT+MIPIREAD={}".format(tune_code_register_key)) 159 time.sleep(SHORT_SLEEP) 160 except: 161 pass 162 if "+MIPIREAD:"+tune_code['tune_code_registers'][tune_code_register_key].lower() in at_tc_reg_output: 163 logging.info('Forced tune code register {} value matches expectation: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key])) 164 else: 165 logging.warning('Expected tune code register {} value: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key])) 166 logging.warning('tune code register value is set to {}'.format(at_tc_reg_output)) 167 flag_tc_reg_correct = False 168 if flag_tc_reg_correct: 169 return True 170 else: 171 raise RuntimeError("Enable modem test mode SUCCESSFUL, but register values NOT correct") 172 173# Miscellaneous Wifi Utilities 174def check_skip_conditions(testcase_params, dut, access_point, 175 ota_chamber=None): 176 """Checks if test should be skipped.""" 177 # Check battery level before test 178 if not health_check(dut, 10): 179 asserts.skip('DUT battery level too low.') 180 if not access_point.band_lookup_by_channel(testcase_params['channel']): 181 asserts.skip('AP does not support requested channel.') 182 if ota_chamber and CHANNEL_TO_BAND_MAP[ 183 testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS: 184 asserts.skip('OTA chamber does not support requested channel.') 185 # Check if 6GHz is supported by checking capabilities in the US. 186 if not dut.droid.wifiCheckState(): 187 wutils.wifi_toggle_state(dut, True) 188 iw_list = dut.adb.shell('iw list') 189 supports_6ghz = '6135 MHz' in iw_list 190 supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list 191 if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz: 192 asserts.skip('DUT does not support 160 MHz networks.') 193 if testcase_params.get('channel', 194 6) in CHANNELS_6GHz and not supports_6ghz: 195 asserts.skip('DUT does not support 6 GHz band.') 196 197 198def validate_network(dut, ssid): 199 """Check that DUT has a valid internet connection through expected SSID 200 201 Args: 202 dut: android device of interest 203 ssid: expected ssid 204 """ 205 try: 206 connected = wutils.validate_connection(dut, wait_time=3) is not None 207 current_network = dut.droid.wifiGetConnectionInfo() 208 except: 209 connected = False 210 current_network = None 211 if connected and current_network['SSID'] == ssid: 212 return True 213 else: 214 return False 215 216 217def get_server_address(ssh_connection, dut_ip, subnet_mask): 218 """Get server address on a specific subnet, 219 220 This function retrieves the LAN or WAN IP of a remote machine used in 221 testing. If subnet_mask is set to 'public' it returns a machines global ip, 222 else it returns the ip belonging to the dut local network given the dut's 223 ip and subnet mask. 224 225 Args: 226 ssh_connection: object representing server for which we want an ip 227 dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx 228 subnet_mask: string representing subnet mask (public for global ip) 229 """ 230 ifconfig_out = ssh_connection.run('ifconfig').stdout 231 ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out) 232 ip_list = [ipaddress.ip_address(ip) for ip in ip_list] 233 234 if subnet_mask == 'public': 235 for ip in ip_list: 236 # is_global is not used to allow for CGNAT ips in 100.x.y.z range 237 if not ip.is_private: 238 return str(ip) 239 else: 240 dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask), 241 strict=False) 242 for ip in ip_list: 243 if ip in dut_network: 244 return str(ip) 245 logging.error('No IP address found in requested subnet') 246 247 248# Ping utilities 249def get_ping_stats(src_device, dest_address, ping_duration, ping_interval, 250 ping_size): 251 """Run ping to or from the DUT. 252 253 The function computes either pings the DUT or pings a remote ip from 254 DUT. 255 256 Args: 257 src_device: object representing device to ping from 258 dest_address: ip address to ping 259 ping_duration: timeout to set on the ping process (in seconds) 260 ping_interval: time between pings (in seconds) 261 ping_size: size of ping packet payload 262 Returns: 263 ping_result: dict containing ping results and other meta data 264 """ 265 ping_count = int(ping_duration / ping_interval) 266 ping_deadline = int(ping_count * ping_interval) + 1 267 ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format( 268 ping_count, 269 ping_deadline, 270 ping_interval, 271 ping_size, 272 ) 273 274 ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format( 275 ping_count, 276 ping_deadline, 277 ping_interval, 278 ping_size, 279 ) 280 281 if isinstance(src_device, AndroidDevice): 282 ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address) 283 ping_output = src_device.adb.shell(ping_cmd, 284 timeout=ping_deadline + SHORT_SLEEP, 285 ignore_status=True) 286 elif isinstance(src_device, ssh.connection.SshConnection): 287 platform = src_device.run('uname').stdout 288 if 'linux' in platform.lower(): 289 ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address) 290 elif 'darwin' in platform.lower(): 291 ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format( 292 ping_cmd_macos, dest_address) 293 ping_output = src_device.run(ping_cmd, 294 timeout=ping_deadline + SHORT_SLEEP, 295 ignore_status=True).stdout 296 else: 297 raise TypeError('Unable to ping using src_device of type %s.' % 298 type(src_device)) 299 return ping_utils.PingResult(ping_output.splitlines()) 300 301 302@nonblocking 303def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval, 304 ping_size): 305 return get_ping_stats(src_device, dest_address, ping_duration, 306 ping_interval, ping_size) 307 308 309# Iperf utilities 310@nonblocking 311def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag, 312 timeout): 313 return iperf_client.start(iperf_server_address, iperf_args, tag, timeout) 314 315 316def get_iperf_arg_string(duration, 317 reverse_direction, 318 interval=1, 319 traffic_type='TCP', 320 socket_size=None, 321 num_processes=1, 322 udp_throughput='1000M', 323 ipv6=False, 324 udp_length=1448): 325 """Function to format iperf client arguments. 326 327 This function takes in iperf client parameters and returns a properly 328 formatter iperf arg string to be used in throughput tests. 329 330 Args: 331 duration: iperf duration in seconds 332 reverse_direction: boolean controlling the -R flag for iperf clients 333 interval: iperf print interval 334 traffic_type: string specifying TCP or UDP traffic 335 socket_size: string specifying TCP window or socket buffer, e.g., 2M 336 num_processes: int specifying number of iperf processes 337 udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M 338 ipv6: boolean controlling the use of IP V6 339 Returns: 340 iperf_args: string of formatted iperf args 341 """ 342 iperf_args = '-i {} -t {} -J '.format(interval, duration) 343 if ipv6: 344 iperf_args = iperf_args + '-6 ' 345 if traffic_type.upper() == 'UDP': 346 iperf_args = iperf_args + '-u -b {} -l {} -P {} '.format( 347 udp_throughput, udp_length, num_processes) 348 elif traffic_type.upper() == 'TCP': 349 iperf_args = iperf_args + '-P {} '.format(num_processes) 350 if socket_size: 351 iperf_args = iperf_args + '-w {} '.format(socket_size) 352 if reverse_direction: 353 iperf_args = iperf_args + ' -R' 354 return iperf_args 355 356 357# Attenuator Utilities 358def atten_by_label(atten_list, path_label, atten_level): 359 """Attenuate signals according to their path label. 360 361 Args: 362 atten_list: list of attenuators to iterate over 363 path_label: path label on which to set desired attenuation 364 atten_level: attenuation desired on path 365 """ 366 for atten in atten_list: 367 if path_label in atten.path: 368 atten.set_atten(atten_level, retry=True) 369 370 371def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server): 372 """Function to estimate attenuation to hit a target RSSI. 373 374 This function estimates a constant attenuation setting on all atennuation 375 ports to hit a target RSSI. The estimate is not meant to be exact or 376 guaranteed. 377 378 Args: 379 target_rssi: rssi of interest 380 attenuators: list of attenuator ports 381 dut: android device object assumed connected to a wifi network. 382 ping_server: ssh connection object to ping server 383 Returns: 384 target_atten: attenuation setting to achieve target_rssi 385 """ 386 logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi)) 387 # Set attenuator to 0 dB 388 for atten in attenuators: 389 atten.set_atten(0, strict=False, retry=True) 390 # Start ping traffic 391 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 392 # Measure starting RSSI 393 ping_future = get_ping_stats_nb(src_device=ping_server, 394 dest_address=dut_ip, 395 ping_duration=1.5, 396 ping_interval=0.02, 397 ping_size=64) 398 current_rssi = get_connected_rssi(dut, 399 num_measurements=4, 400 polling_frequency=0.25, 401 first_measurement_delay=0.5, 402 disconnect_warning=1, 403 ignore_samples=1) 404 current_rssi = current_rssi['signal_poll_rssi']['mean'] 405 ping_future.result() 406 target_atten = 0 407 logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 408 target_atten, current_rssi)) 409 within_range = 0 410 for idx in range(20): 411 atten_delta = max(min(current_rssi - target_rssi, 20), -20) 412 target_atten = int((target_atten + atten_delta) * 4) / 4 413 if target_atten < 0: 414 return 0 415 if target_atten > attenuators[0].get_max_atten(): 416 return attenuators[0].get_max_atten() 417 for atten in attenuators: 418 atten.set_atten(target_atten, strict=False, retry=True) 419 ping_future = get_ping_stats_nb(src_device=ping_server, 420 dest_address=dut_ip, 421 ping_duration=1.5, 422 ping_interval=0.02, 423 ping_size=64) 424 current_rssi = get_connected_rssi(dut, 425 num_measurements=4, 426 polling_frequency=0.25, 427 first_measurement_delay=0.5, 428 disconnect_warning=1, 429 ignore_samples=1) 430 current_rssi = current_rssi['signal_poll_rssi']['mean'] 431 ping_future.result() 432 logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 433 target_atten, current_rssi)) 434 if abs(current_rssi - target_rssi) < 1: 435 if within_range: 436 logging.info( 437 'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.' 438 'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format( 439 current_rssi, target_rssi, target_atten, idx)) 440 return target_atten 441 else: 442 within_range = True 443 else: 444 within_range = False 445 return target_atten 446 447 448def get_current_atten_dut_chain_map(attenuators, 449 dut, 450 ping_server, 451 ping_from_dut=False): 452 """Function to detect mapping between attenuator ports and DUT chains. 453 454 This function detects the mapping between attenuator ports and DUT chains 455 in cases where DUT chains are connected to only one attenuator port. The 456 function assumes the DUT is already connected to a wifi network. The 457 function starts by measuring per chain RSSI at 0 attenuation, then 458 attenuates one port at a time looking for the chain that reports a lower 459 RSSI. 460 461 Args: 462 attenuators: list of attenuator ports 463 dut: android device object assumed connected to a wifi network. 464 ping_server: ssh connection object to ping server 465 ping_from_dut: boolean controlling whether to ping from or to dut 466 Returns: 467 chain_map: list of dut chains, one entry per attenuator port 468 """ 469 # Set attenuator to 0 dB 470 for atten in attenuators: 471 atten.set_atten(0, strict=False, retry=True) 472 # Start ping traffic 473 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 474 if ping_from_dut: 475 ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname, 476 11, 0.02, 64) 477 else: 478 ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64) 479 # Measure starting RSSI 480 base_rssi = get_connected_rssi(dut, 4, 0.25, 1) 481 chain0_base_rssi = base_rssi['chain_0_rssi']['mean'] 482 chain1_base_rssi = base_rssi['chain_1_rssi']['mean'] 483 if chain0_base_rssi < -70 or chain1_base_rssi < -70: 484 logging.warning('RSSI might be too low to get reliable chain map.') 485 # Compile chain map by attenuating one path at a time and seeing which 486 # chain's RSSI degrades 487 chain_map = [] 488 for test_atten in attenuators: 489 # Set one attenuator to 30 dB down 490 test_atten.set_atten(30, strict=False, retry=True) 491 # Get new RSSI 492 test_rssi = get_connected_rssi(dut, 4, 0.25, 1) 493 # Assign attenuator to path that has lower RSSI 494 if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[ 495 'chain_0_rssi']['mean'] > 10: 496 chain_map.append('DUT-Chain-0') 497 elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[ 498 'chain_1_rssi']['mean'] > 10: 499 chain_map.append('DUT-Chain-1') 500 else: 501 chain_map.append(None) 502 # Reset attenuator to 0 503 test_atten.set_atten(0, strict=False, retry=True) 504 ping_future.result() 505 logging.debug('Chain Map: {}'.format(chain_map)) 506 return chain_map 507 508 509def get_full_rf_connection_map(attenuators, 510 dut, 511 ping_server, 512 networks, 513 ping_from_dut=False): 514 """Function to detect per-network connections between attenuator and DUT. 515 516 This function detects the mapping between attenuator ports and DUT chains 517 on all networks in its arguments. The function connects the DUT to each 518 network then calls get_current_atten_dut_chain_map to get the connection 519 map on the current network. The function outputs the results in two formats 520 to enable easy access when users are interested in indexing by network or 521 attenuator port. 522 523 Args: 524 attenuators: list of attenuator ports 525 dut: android device object assumed connected to a wifi network. 526 ping_server: ssh connection object to ping server 527 networks: dict of network IDs and configs 528 Returns: 529 rf_map_by_network: dict of RF connections indexed by network. 530 rf_map_by_atten: list of RF connections indexed by attenuator 531 """ 532 for atten in attenuators: 533 atten.set_atten(0, strict=False, retry=True) 534 535 rf_map_by_network = collections.OrderedDict() 536 rf_map_by_atten = [[] for atten in attenuators] 537 for net_id, net_config in networks.items(): 538 wutils.reset_wifi(dut) 539 wutils.wifi_connect(dut, 540 net_config, 541 num_of_tries=1, 542 assert_on_fail=False, 543 check_connectivity=False) 544 rf_map_by_network[net_id] = get_current_atten_dut_chain_map( 545 attenuators, dut, ping_server, ping_from_dut) 546 for idx, chain in enumerate(rf_map_by_network[net_id]): 547 if chain: 548 rf_map_by_atten[idx].append({ 549 'network': net_id, 550 'dut_chain': chain 551 }) 552 logging.debug('RF Map (by Network): {}'.format(rf_map_by_network)) 553 logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten)) 554 555 return rf_map_by_network, rf_map_by_atten 556 557 558# Generic device utils 559def get_dut_temperature(dut): 560 """Function to get dut temperature. 561 562 The function fetches and returns the reading from the temperature sensor 563 used for skin temperature and thermal throttling. 564 565 Args: 566 dut: AndroidDevice of interest 567 Returns: 568 temperature: device temperature. 0 if temperature could not be read 569 """ 570 candidate_zones = [ 571 '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp', 572 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp', 573 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp', 574 '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp', 575 '/dev/thermal/tz-by-name/quiet_therm/temp' 576 ] 577 for zone in candidate_zones: 578 try: 579 temperature = int(dut.adb.shell('cat {}'.format(zone))) 580 break 581 except: 582 temperature = 0 583 if temperature == 0: 584 logging.debug('Could not check DUT temperature.') 585 elif temperature > 100: 586 temperature = temperature / 1000 587 return temperature 588 589 590def wait_for_dut_cooldown(dut, target_temp=50, timeout=300): 591 """Function to wait for a DUT to cool down. 592 593 Args: 594 dut: AndroidDevice of interest 595 target_temp: target cooldown temperature 596 timeout: maxt time to wait for cooldown 597 """ 598 start_time = time.time() 599 while time.time() - start_time < timeout: 600 temperature = get_dut_temperature(dut) 601 if temperature < target_temp: 602 break 603 time.sleep(SHORT_SLEEP) 604 elapsed_time = time.time() - start_time 605 logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format( 606 temperature, elapsed_time)) 607 608 609def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1): 610 """Function to check health status of a DUT. 611 612 The function checks both battery levels and temperature to avoid DUT 613 powering off during the test. 614 615 Args: 616 dut: AndroidDevice of interest 617 batt_thresh: battery level threshold 618 temp_threshold: temperature threshold 619 cooldown: flag to wait for DUT to cool down when overheating 620 Returns: 621 health_check: boolean confirming device is healthy 622 """ 623 health_check = True 624 battery_level = utils.get_battery_level(dut) 625 if battery_level < batt_thresh: 626 logging.warning('Battery level low ({}%)'.format(battery_level)) 627 health_check = False 628 else: 629 logging.debug('Battery level = {}%'.format(battery_level)) 630 631 temperature = get_dut_temperature(dut) 632 if temperature > temp_threshold: 633 if cooldown: 634 logging.warning( 635 'Waiting for DUT to cooldown. ({} C)'.format(temperature)) 636 wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5) 637 else: 638 logging.warning('DUT Overheating ({} C)'.format(temperature)) 639 health_check = False 640 else: 641 logging.debug('DUT Temperature = {} C'.format(temperature)) 642 return health_check 643 644 645# Wifi Device Utils 646def empty_rssi_result(): 647 return collections.OrderedDict([('data', []), ('mean', float('nan')), 648 ('stdev', float('nan'))]) 649 650 651@nonblocking 652def get_connected_rssi_nb(dut, 653 num_measurements=1, 654 polling_frequency=SHORT_SLEEP, 655 first_measurement_delay=0, 656 disconnect_warning=True, 657 ignore_samples=0, 658 interface='wlan0'): 659 return get_connected_rssi(dut, num_measurements, polling_frequency, 660 first_measurement_delay, disconnect_warning, 661 ignore_samples, interface) 662 663 664@detect_wifi_decorator 665def get_connected_rssi(dut, 666 num_measurements=1, 667 polling_frequency=SHORT_SLEEP, 668 first_measurement_delay=0, 669 disconnect_warning=True, 670 ignore_samples=0, 671 interface='wlan0'): 672 """Gets all RSSI values reported for the connected access point/BSSID. 673 674 Args: 675 dut: android device object from which to get RSSI 676 num_measurements: number of scans done, and RSSIs collected 677 polling_frequency: time to wait between RSSI measurements 678 disconnect_warning: boolean controlling disconnection logging messages 679 ignore_samples: number of leading samples to ignore 680 Returns: 681 connected_rssi: dict containing the measurements results for 682 all reported RSSI values (signal_poll, per chain, etc.) and their 683 statistics 684 """ 685 686 687@nonblocking 688def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1): 689 return get_scan_rssi(dut, tracked_bssids, num_measurements) 690 691 692@detect_wifi_decorator 693def get_scan_rssi(dut, tracked_bssids, num_measurements=1): 694 """Gets scan RSSI for specified BSSIDs. 695 696 Args: 697 dut: android device object from which to get RSSI 698 tracked_bssids: array of BSSIDs to gather RSSI data for 699 num_measurements: number of scans done, and RSSIs collected 700 Returns: 701 scan_rssi: dict containing the measurement results as well as the 702 statistics of the scan RSSI for all BSSIDs in tracked_bssids 703 """ 704 705 706@detect_wifi_decorator 707def get_sw_signature(dut): 708 """Function that checks the signature for wifi firmware and config files. 709 710 Returns: 711 bdf_signature: signature consisting of last three digits of bdf cksums 712 fw_signature: floating point firmware version, i.e., major.minor 713 """ 714 715 716@detect_wifi_decorator 717def get_country_code(dut): 718 """Function that returns the current wifi country code.""" 719 720 721@detect_wifi_decorator 722def push_config(dut, config_file): 723 """Function to push Wifi BDF files 724 725 This function checks for existing wifi bdf files and over writes them all, 726 for simplicity, with the bdf file provided in the arguments. The dut is 727 rebooted for the bdf file to take effect 728 729 Args: 730 dut: dut to push bdf file to 731 config_file: path to bdf_file to push 732 """ 733 734 735@detect_wifi_decorator 736def start_wifi_logging(dut): 737 """Function to start collecting wifi-related logs""" 738 739 740@detect_wifi_decorator 741def stop_wifi_logging(dut): 742 """Function to start collecting wifi-related logs""" 743 744 745@detect_wifi_decorator 746def push_firmware(dut, firmware_files): 747 """Function to push Wifi firmware files 748 749 Args: 750 dut: dut to push bdf file to 751 firmware_files: path to wlanmdsp.mbn file 752 datamsc_file: path to Data.msc file 753 """ 754 755 756@detect_wifi_decorator 757def disable_beamforming(dut): 758 """Function to disable beamforming.""" 759 760 761@detect_wifi_decorator 762def set_nss_capability(dut, nss): 763 """Function to set number of spatial streams supported.""" 764 765 766@detect_wifi_decorator 767def set_chain_mask(dut, chain_mask): 768 """Function to set DUT chain mask. 769 770 Args: 771 dut: android device 772 chain_mask: desired chain mask in [0, 1, '2x2'] 773 """ 774 775 776# Link layer stats utilities 777class LinkLayerStats(): 778 779 def __new__(self, dut, llstats_enabled=True): 780 if detect_wifi_platform(dut) == 'qcom': 781 return qcom_utils.LinkLayerStats(dut, llstats_enabled) 782 else: 783 return brcm_utils.LinkLayerStats(dut, llstats_enabled) 784