17import collections
18import importlib
19import ipaddress
20import logging
21import numpy
22import re
23import time
24from acts import asserts
25from acts import utils
26from acts.controllers.android_device import AndroidDevice
27from acts.controllers.utils_lib import ssh
28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils
30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils
31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils
33from concurrent.futures import ThreadPoolExecutor
37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)]
39    '2.4GHz': list(range(1, 14)),
40    'UNII-1': [36, 40, 44, 48],
41    'UNII-2':
42    [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140],
43    'UNII-3': [149, 153, 157, 161, 165],
44    '6GHz': CHANNELS_6GHz
47    channel: band
48    for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels
52# Decorators
53def nonblocking(f):
54    """Creates a decorator transforming function calls to non-blocking"""
56    def wrap(*args, **kwargs):
57        executor = ThreadPoolExecutor(max_workers=1)
58        thread_future = executor.submit(f, *args, **kwargs)
59        # Ensure resources are freed up when executor ruturns or raises
60        executor.shutdown(wait=False)
61        return thread_future
63    return wrap
66def detect_wifi_platform(dut):
67    if hasattr(dut, 'wifi_platform'):
68        return dut.wifi_platform
69    qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/'))
70    if qcom_check:
71        dut.wifi_platform = 'qcom'
72    else:
73        dut.wifi_platform = 'brcm'
74    return dut.wifi_platform
77def detect_wifi_decorator(f):
79    def wrap(*args, **kwargs):
80        if 'dut' in kwargs:
81            dut = kwargs['dut']
82        else:
83            dut = next(arg for arg in args if type(arg) == AndroidDevice)
84        dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format(
85            detect_wifi_platform(dut))
86        dut_package = importlib.import_module(dut_package)
87        f_decorated = getattr(dut_package, f.__name__, lambda: None)
88        return (f_decorated(*args, **kwargs))
90    return wrap
93# JSON serializer
94def serialize_dict(input_dict):
95    """Function to serialize dicts to enable JSON output"""
96    output_dict = collections.OrderedDict()
97    for key, value in input_dict.items():
98        output_dict[_serialize_value(key)] = _serialize_value(value)
99    return output_dict
102def _serialize_value(value):
103    """Function to recursively serialize dict entries to enable JSON output"""
104    if isinstance(value, tuple):
105        return str(value)
106    if isinstance(value, numpy.int64):
107        return int(value)
108    if isinstance(value, numpy.float64):
109        return float(value)
110    if isinstance(value, list):
111        return [_serialize_value(x) for x in value]
112    if isinstance(value, numpy.ndarray):
113        return [_serialize_value(x) for x in value]
114    elif isinstance(value, dict):
115        return serialize_dict(value)
116    elif type(value) in (float, int, bool, str):
117        return value
118    else:
119        return "Non-serializable object"
122def extract_sub_dict(full_dict, fields):
123    sub_dict = collections.OrderedDict(
124        (field, full_dict[field]) for field in fields)
125    return sub_dict
127def write_antenna_tune_code(dut, tune_code):
128    flag_tune_code_forcing_on = 0
129    # Use AT Command file to enable tune code forcing
130    for n_enable in range(5):
131        logging.debug('{}-th Enabling modem test mode'.format(n_enable))
132        try:
133            at_lmodetest_output = dut.adb.shell("modem_cmd raw AT+LMODETEST")
134            time.sleep(SHORT_SLEEP)
135            logging.debug('Command AT+LMODETEST output: {}'.format(at_lmodetest_output))
136        except:
137            logging.error('{}-th Failed modem test mode AT+LMODETEST'.format(n_enable))
138            continue
139        try:
140            at_lrffinalstart_output = dut.adb.shell("modem_cmd raw AT+LRFFINALSTART")
141            time.sleep(SHORT_SLEEP)
142            logging.debug('Command AT+LRFFINALSTART output: {}'.format(at_lrffinalstart_output))
143            if "+LRFFINALSTART:0" in at_lrffinalstart_output and not "ERROR" in at_lrffinalstart_output:
144                flag_tune_code_forcing_on = 1
145                logging.info('Enable modem test mode SUCCESSFUL')
146                break
147        except:
148            logging.error('{}-th Failed modem test mode AT+LRFFINALSTART'.format(n_enable))
149            continue
150    if not flag_tune_code_forcing_on:
151        raise RuntimeError("AT Command File Not set up")
152    at_cmd_output = dut.adb.shell("modem_cmd raw " + tune_code['tune_code_cmd'])
153    logging.debug('Write Tune Code: {}'.format("modem_cmd raw " + tune_code['tune_code_cmd']))
154    flag_tc_reg_correct = True
155    # Check tune code register values
156    for tune_code_register_key in tune_code['tune_code_registers'].keys():
157        try:
158            at_tc_reg_output = dut.adb.shell("modem_cmd raw AT+MIPIREAD={}".format(tune_code_register_key))
159            time.sleep(SHORT_SLEEP)
160        except:
161            pass
162        if "+MIPIREAD:"+tune_code['tune_code_registers'][tune_code_register_key].lower() in at_tc_reg_output:
163            logging.info('Forced tune code register {} value matches expectation: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
164        else:
165            logging.warning('Expected tune code register {} value: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
166            logging.warning('tune code register value is set to {}'.format(at_tc_reg_output))
167            flag_tc_reg_correct = False
168    if flag_tc_reg_correct:
169        return True
170    else:
171        raise RuntimeError("Enable modem test mode SUCCESSFUL, but register values NOT correct")
173# Miscellaneous Wifi Utilities
174def check_skip_conditions(testcase_params, dut, access_point,
175                          ota_chamber=None):
176    """Checks if test should be skipped."""
177    # Check battery level before test
178    if not health_check(dut, 10):
179        asserts.skip('DUT battery level too low.')
180    if not access_point.band_lookup_by_channel(testcase_params['channel']):
181        asserts.skip('AP does not support requested channel.')
182    if ota_chamber and CHANNEL_TO_BAND_MAP[
183            testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS:
184        asserts.skip('OTA chamber does not support requested channel.')
185    # Check if 6GHz is supported by checking capabilities in the US.
186    if not dut.droid.wifiCheckState():
187        wutils.wifi_toggle_state(dut, True)
188    iw_list = dut.adb.shell('iw list')
189    supports_6ghz = '6135 MHz' in iw_list
190    supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list
191    if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz:
192        asserts.skip('DUT does not support 160 MHz networks.')
193    if testcase_params.get('channel',
194                           6) in CHANNELS_6GHz and not supports_6ghz:
195        asserts.skip('DUT does not support 6 GHz band.')
198def validate_network(dut, ssid):
199    """Check that DUT has a valid internet connection through expected SSID
201    Args:
202        dut: android device of interest
203        ssid: expected ssid
204    """
205    try:
206        connected = wutils.validate_connection(dut, wait_time=3) is not None
207        current_network = dut.droid.wifiGetConnectionInfo()
208    except:
209        connected = False
210        current_network = None
211    if connected and current_network['SSID'] == ssid:
212        return True
213    else:
214        return False
217def get_server_address(ssh_connection, dut_ip, subnet_mask):
218    """Get server address on a specific subnet,
220    This function retrieves the LAN or WAN IP of a remote machine used in
221    testing. If subnet_mask is set to 'public' it returns a machines global ip,
222    else it returns the ip belonging to the dut local network given the dut's
223    ip and subnet mask.
225    Args:
226        ssh_connection: object representing server for which we want an ip
227        dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx
228        subnet_mask: string representing subnet mask (public for global ip)
229    """
230    ifconfig_out = ssh_connection.run('ifconfig').stdout
231    ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out)
232    ip_list = [ipaddress.ip_address(ip) for ip in ip_list]
234    if subnet_mask == 'public':
235        for ip in ip_list:
236            # is_global is not used to allow for CGNAT ips in 100.x.y.z range
237            if not ip.is_private:
238                return str(ip)
239    else:
240        dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask),
241                                           strict=False)
242        for ip in ip_list:
243            if ip in dut_network:
244                return str(ip)
245    logging.error('No IP address found in requested subnet')
248# Ping utilities
249def get_ping_stats(src_device, dest_address, ping_duration, ping_interval,
250                   ping_size):
251    """Run ping to or from the DUT.
253    The function computes either pings the DUT or pings a remote ip from
254    DUT.
256    Args:
257        src_device: object representing device to ping from
258        dest_address: ip address to ping
259        ping_duration: timeout to set on the ping process (in seconds)
260        ping_interval: time between pings (in seconds)
261        ping_size: size of ping packet payload
262    Returns:
263        ping_result: dict containing ping results and other meta data
264    """
265    ping_count = int(ping_duration / ping_interval)
266    ping_deadline = int(ping_count * ping_interval) + 1
267    ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format(
268        ping_count,
269        ping_deadline,
270        ping_interval,
271        ping_size,
272    )
274    ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format(
275        ping_count,
276        ping_deadline,
277        ping_interval,
278        ping_size,
279    )
281    if isinstance(src_device, AndroidDevice):
282        ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address)
283        ping_output = src_device.adb.shell(ping_cmd,
284                                           timeout=ping_deadline + SHORT_SLEEP,
285                                           ignore_status=True)
286    elif isinstance(src_device, ssh.connection.SshConnection):
287        platform = src_device.run('uname').stdout
288        if 'linux' in platform.lower():
289            ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address)
290        elif 'darwin' in platform.lower():
291            ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format(
292                ping_cmd_macos, dest_address)
293        ping_output = src_device.run(ping_cmd,
294                                     timeout=ping_deadline + SHORT_SLEEP,
295                                     ignore_status=True).stdout
296    else:
297        raise TypeError('Unable to ping using src_device of type %s.' %
298                        type(src_device))
299    return ping_utils.PingResult(ping_output.splitlines())
303def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval,
304                      ping_size):
305    return get_ping_stats(src_device, dest_address, ping_duration,
306                          ping_interval, ping_size)
309# Iperf utilities
311def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag,
312                          timeout):
313    return iperf_client.start(iperf_server_address, iperf_args, tag, timeout)
316def get_iperf_arg_string(duration,
317                         reverse_direction,
318                         interval=1,
319                         traffic_type='TCP',
320                         socket_size=None,
321                         num_processes=1,
322                         udp_throughput='1000M',
323                         ipv6=False,
324                         udp_length=1448):
325    """Function to format iperf client arguments.
327    This function takes in iperf client parameters and returns a properly
328    formatter iperf arg string to be used in throughput tests.
330    Args:
331        duration: iperf duration in seconds
332        reverse_direction: boolean controlling the -R flag for iperf clients
333        interval: iperf print interval
334        traffic_type: string specifying TCP or UDP traffic
335        socket_size: string specifying TCP window or socket buffer, e.g., 2M
336        num_processes: int specifying number of iperf processes
337        udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M
338        ipv6: boolean controlling the use of IP V6
339    Returns:
340        iperf_args: string of formatted iperf args
341    """
342    iperf_args = '-i {} -t {} -J '.format(interval, duration)
343    if ipv6:
344        iperf_args = iperf_args + '-6 '
345    if traffic_type.upper() == 'UDP':
346        iperf_args = iperf_args + '-u -b {} -l {} -P {} '.format(
347            udp_throughput, udp_length, num_processes)
348    elif traffic_type.upper() == 'TCP':
349        iperf_args = iperf_args + '-P {} '.format(num_processes)
350    if socket_size:
351        iperf_args = iperf_args + '-w {} '.format(socket_size)
352    if reverse_direction:
353        iperf_args = iperf_args + ' -R'
354    return iperf_args
357# Attenuator Utilities
358def atten_by_label(atten_list, path_label, atten_level):
359    """Attenuate signals according to their path label.
361    Args:
362        atten_list: list of attenuators to iterate over
363        path_label: path label on which to set desired attenuation
364        atten_level: attenuation desired on path
365    """
366    for atten in atten_list:
367        if path_label in atten.path:
368            atten.set_atten(atten_level, retry=True)
371def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server):
372    """Function to estimate attenuation to hit a target RSSI.
374    This function estimates a constant attenuation setting on all atennuation
375    ports to hit a target RSSI. The estimate is not meant to be exact or
376    guaranteed.
378    Args:
379        target_rssi: rssi of interest
380        attenuators: list of attenuator ports
381        dut: android device object assumed connected to a wifi network.
382        ping_server: ssh connection object to ping server
383    Returns:
384        target_atten: attenuation setting to achieve target_rssi
385    """
386    logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi))
387    # Set attenuator to 0 dB
388    for atten in attenuators:
389        atten.set_atten(0, strict=False, retry=True)
390    # Start ping traffic
391    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
392    # Measure starting RSSI
393    ping_future = get_ping_stats_nb(src_device=ping_server,
394                                    dest_address=dut_ip,
395                                    ping_duration=1.5,
396                                    ping_interval=0.02,
397                                    ping_size=64)
398    current_rssi = get_connected_rssi(dut,
399                                      num_measurements=4,
400                                      polling_frequency=0.25,
401                                      first_measurement_delay=0.5,
402                                      disconnect_warning=1,
403                                      ignore_samples=1)
404    current_rssi = current_rssi['signal_poll_rssi']['mean']
405    ping_future.result()
406    target_atten = 0
407    logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
408        target_atten, current_rssi))
409    within_range = 0
410    for idx in range(20):
411        atten_delta = max(min(current_rssi - target_rssi, 20), -20)
412        target_atten = int((target_atten + atten_delta) * 4) / 4
413        if target_atten < 0:
414            return 0
415        if target_atten > attenuators[0].get_max_atten():
416            return attenuators[0].get_max_atten()
417        for atten in attenuators:
418            atten.set_atten(target_atten, strict=False, retry=True)
419        ping_future = get_ping_stats_nb(src_device=ping_server,
420                                        dest_address=dut_ip,
421                                        ping_duration=1.5,
422                                        ping_interval=0.02,
423                                        ping_size=64)
424        current_rssi = get_connected_rssi(dut,
425                                          num_measurements=4,
426                                          polling_frequency=0.25,
427                                          first_measurement_delay=0.5,
428                                          disconnect_warning=1,
429                                          ignore_samples=1)
430        current_rssi = current_rssi['signal_poll_rssi']['mean']
431        ping_future.result()
432        logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
433            target_atten, current_rssi))
434        if abs(current_rssi - target_rssi) < 1:
435            if within_range:
436                logging.info(
437                    'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.'
438                    'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format(
439                        current_rssi, target_rssi, target_atten, idx))
440                return target_atten
441            else:
442                within_range = True
443        else:
444            within_range = False
445    return target_atten
448def get_current_atten_dut_chain_map(attenuators,
449                                    dut,
450                                    ping_server,
451                                    ping_from_dut=False):
452    """Function to detect mapping between attenuator ports and DUT chains.
454    This function detects the mapping between attenuator ports and DUT chains
455    in cases where DUT chains are connected to only one attenuator port. The
456    function assumes the DUT is already connected to a wifi network. The
457    function starts by measuring per chain RSSI at 0 attenuation, then
458    attenuates one port at a time looking for the chain that reports a lower
459    RSSI.
461    Args:
462        attenuators: list of attenuator ports
463        dut: android device object assumed connected to a wifi network.
464        ping_server: ssh connection object to ping server
465        ping_from_dut: boolean controlling whether to ping from or to dut
466    Returns:
467        chain_map: list of dut chains, one entry per attenuator port
468    """
469    # Set attenuator to 0 dB
470    for atten in attenuators:
471        atten.set_atten(0, strict=False, retry=True)
472    # Start ping traffic
473    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
474    if ping_from_dut:
475        ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname,
476                                        11, 0.02, 64)
477    else:
478        ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64)
479    # Measure starting RSSI
480    base_rssi = get_connected_rssi(dut, 4, 0.25, 1)
481    chain0_base_rssi = base_rssi['chain_0_rssi']['mean']
482    chain1_base_rssi = base_rssi['chain_1_rssi']['mean']
483    if chain0_base_rssi < -70 or chain1_base_rssi < -70:
484        logging.warning('RSSI might be too low to get reliable chain map.')
485    # Compile chain map by attenuating one path at a time and seeing which
486    # chain's RSSI degrades
487    chain_map = []
488    for test_atten in attenuators:
489        # Set one attenuator to 30 dB down
490        test_atten.set_atten(30, strict=False, retry=True)
491        # Get new RSSI
492        test_rssi = get_connected_rssi(dut, 4, 0.25, 1)
493        # Assign attenuator to path that has lower RSSI
494        if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[
495                'chain_0_rssi']['mean'] > 10:
496            chain_map.append('DUT-Chain-0')
497        elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[
498                'chain_1_rssi']['mean'] > 10:
499            chain_map.append('DUT-Chain-1')
500        else:
501            chain_map.append(None)
502        # Reset attenuator to 0
503        test_atten.set_atten(0, strict=False, retry=True)
504    ping_future.result()
505    logging.debug('Chain Map: {}'.format(chain_map))
506    return chain_map
509def get_full_rf_connection_map(attenuators,
510                               dut,
511                               ping_server,
512                               networks,
513                               ping_from_dut=False):
514    """Function to detect per-network connections between attenuator and DUT.
516    This function detects the mapping between attenuator ports and DUT chains
517    on all networks in its arguments. The function connects the DUT to each
518    network then calls get_current_atten_dut_chain_map to get the connection
519    map on the current network. The function outputs the results in two formats
520    to enable easy access when users are interested in indexing by network or
521    attenuator port.
523    Args:
524        attenuators: list of attenuator ports
525        dut: android device object assumed connected to a wifi network.
526        ping_server: ssh connection object to ping server
527        networks: dict of network IDs and configs
528    Returns:
529        rf_map_by_network: dict of RF connections indexed by network.
530        rf_map_by_atten: list of RF connections indexed by attenuator
531    """
532    for atten in attenuators:
533        atten.set_atten(0, strict=False, retry=True)
535    rf_map_by_network = collections.OrderedDict()
536    rf_map_by_atten = [[] for atten in attenuators]
537    for net_id, net_config in networks.items():
538        wutils.reset_wifi(dut)
539        wutils.wifi_connect(dut,
540                            net_config,
541                            num_of_tries=1,
542                            assert_on_fail=False,
543                            check_connectivity=False)
544        rf_map_by_network[net_id] = get_current_atten_dut_chain_map(
545            attenuators, dut, ping_server, ping_from_dut)
546        for idx, chain in enumerate(rf_map_by_network[net_id]):
547            if chain:
548                rf_map_by_atten[idx].append({
549                    'network': net_id,
550                    'dut_chain': chain
551                })
552    logging.debug('RF Map (by Network): {}'.format(rf_map_by_network))
553    logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten))
555    return rf_map_by_network, rf_map_by_atten
558# Generic device utils
559def get_dut_temperature(dut):
560    """Function to get dut temperature.
562    The function fetches and returns the reading from the temperature sensor
563    used for skin temperature and thermal throttling.
565    Args:
566        dut: AndroidDevice of interest
567    Returns:
568        temperature: device temperature. 0 if temperature could not be read
569    """
570    candidate_zones = [
571        '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp',
572        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp',
573        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp',
574        '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp',
575        '/dev/thermal/tz-by-name/quiet_therm/temp'
576    ]
577    for zone in candidate_zones:
578        try:
579            temperature = int(dut.adb.shell('cat {}'.format(zone)))
580            break
581        except:
582            temperature = 0
583    if temperature == 0:
584        logging.debug('Could not check DUT temperature.')
585    elif temperature > 100:
586        temperature = temperature / 1000
587    return temperature
590def wait_for_dut_cooldown(dut, target_temp=50, timeout=300):
591    """Function to wait for a DUT to cool down.
593    Args:
594        dut: AndroidDevice of interest
595        target_temp: target cooldown temperature
596        timeout: maxt time to wait for cooldown
597    """
598    start_time = time.time()
599    while time.time() - start_time < timeout:
600        temperature = get_dut_temperature(dut)
601        if temperature < target_temp:
602            break
603        time.sleep(SHORT_SLEEP)
604    elapsed_time = time.time() - start_time
605    logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format(
606        temperature, elapsed_time))
609def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1):
610    """Function to check health status of a DUT.
612    The function checks both battery levels and temperature to avoid DUT
613    powering off during the test.
615    Args:
616        dut: AndroidDevice of interest
617        batt_thresh: battery level threshold
618        temp_threshold: temperature threshold
619        cooldown: flag to wait for DUT to cool down when overheating
620    Returns:
621        health_check: boolean confirming device is healthy
622    """
623    health_check = True
624    battery_level = utils.get_battery_level(dut)
625    if battery_level < batt_thresh:
626        logging.warning('Battery level low ({}%)'.format(battery_level))
627        health_check = False
628    else:
629        logging.debug('Battery level = {}%'.format(battery_level))
631    temperature = get_dut_temperature(dut)
632    if temperature > temp_threshold:
633        if cooldown:
634            logging.warning(
635                'Waiting for DUT to cooldown. ({} C)'.format(temperature))
636            wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5)
637        else:
638            logging.warning('DUT Overheating ({} C)'.format(temperature))
639            health_check = False
640    else:
641        logging.debug('DUT Temperature = {} C'.format(temperature))
642    return health_check
645# Wifi Device Utils
646def empty_rssi_result():
647    return collections.OrderedDict([('data', []), ('mean', float('nan')),
648                                    ('stdev', float('nan'))])
652def get_connected_rssi_nb(dut,
653                          num_measurements=1,
654                          polling_frequency=SHORT_SLEEP,
655                          first_measurement_delay=0,
656                          disconnect_warning=True,
657                          ignore_samples=0,
658                          interface='wlan0'):
659    return get_connected_rssi(dut, num_measurements, polling_frequency,
660                              first_measurement_delay, disconnect_warning,
661                              ignore_samples, interface)
665def get_connected_rssi(dut,
666                       num_measurements=1,
667                       polling_frequency=SHORT_SLEEP,
668                       first_measurement_delay=0,
669                       disconnect_warning=True,
670                       ignore_samples=0,
671                       interface='wlan0'):
672    """Gets all RSSI values reported for the connected access point/BSSID.
674    Args:
675        dut: android device object from which to get RSSI
676        num_measurements: number of scans done, and RSSIs collected
677        polling_frequency: time to wait between RSSI measurements
678        disconnect_warning: boolean controlling disconnection logging messages
679        ignore_samples: number of leading samples to ignore
680    Returns:
681        connected_rssi: dict containing the measurements results for
682        all reported RSSI values (signal_poll, per chain, etc.) and their
683        statistics
684    """
688def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1):
689    return get_scan_rssi(dut, tracked_bssids, num_measurements)
693def get_scan_rssi(dut, tracked_bssids, num_measurements=1):
694    """Gets scan RSSI for specified BSSIDs.
696    Args:
697        dut: android device object from which to get RSSI
698        tracked_bssids: array of BSSIDs to gather RSSI data for
699        num_measurements: number of scans done, and RSSIs collected
700    Returns:
701        scan_rssi: dict containing the measurement results as well as the
702        statistics of the scan RSSI for all BSSIDs in tracked_bssids
703    """
707def get_sw_signature(dut):
708    """Function that checks the signature for wifi firmware and config files.
710    Returns:
711        bdf_signature: signature consisting of last three digits of bdf cksums
712        fw_signature: floating point firmware version, i.e., major.minor
713    """
717def get_country_code(dut):
718    """Function that returns the current wifi country code."""
722def push_config(dut, config_file):
723    """Function to push Wifi BDF files
725    This function checks for existing wifi bdf files and over writes them all,
726    for simplicity, with the bdf file provided in the arguments. The dut is
727    rebooted for the bdf file to take effect
729    Args:
730        dut: dut to push bdf file to
731        config_file: path to bdf_file to push
732    """
736def start_wifi_logging(dut):
737    """Function to start collecting wifi-related logs"""
741def stop_wifi_logging(dut):
742    """Function to start collecting wifi-related logs"""
746def push_firmware(dut, firmware_files):
747    """Function to push Wifi firmware files
749    Args:
750        dut: dut to push bdf file to
751        firmware_files: path to wlanmdsp.mbn file
752        datamsc_file: path to Data.msc file
753    """
757def disable_beamforming(dut):
758    """Function to disable beamforming."""
762def set_nss_capability(dut, nss):
763    """Function to set number of spatial streams supported."""
767def set_chain_mask(dut, chain_mask):
768    """Function to set DUT chain mask.
770    Args:
771        dut: android device
772        chain_mask: desired chain mask in [0, 1, '2x2']
773    """
776# Link layer stats utilities
777class LinkLayerStats():
779    def __new__(self, dut, llstats_enabled=True):
780        if detect_wifi_platform(dut) == 'qcom':
781            return qcom_utils.LinkLayerStats(dut, llstats_enabled)
782        else:
783            return brcm_utils.LinkLayerStats(dut, llstats_enabled)