1#!/usr/bin/env python3
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17    Base Class for Defining Common WiFi Test Functionality
18"""
19
20import copy
21import os
22import time
23
24from acts import asserts
25from acts import context
26from acts import signals
27from acts import utils
28from acts.base_test import BaseTestClass
29from acts.controllers.ap_lib import hostapd_ap_preset
30from acts.controllers.ap_lib import hostapd_bss_settings
31from acts.controllers.ap_lib import hostapd_constants
32from acts.controllers.ap_lib import hostapd_security
33from acts.keys import Config
34from acts_contrib.test_utils.net import net_test_utils as nutils
35from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
36
37from mobly.base_test import STAGE_NAME_TEARDOWN_CLASS
38
39WifiEnums = wutils.WifiEnums
40AP_1 = 0
41AP_2 = 1
42MAX_AP_COUNT = 2
43
44
45class WifiBaseTest(BaseTestClass):
46    def __init__(self, configs):
47        super().__init__(configs)
48        self.enable_packet_log = False
49        self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G
50        self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G
51        self.tcpdump_proc = []
52        self.packet_log_pid = {}
53
54    def setup_class(self):
55        if hasattr(self, 'attenuators') and self.attenuators:
56            for attenuator in self.attenuators:
57                attenuator.set_atten(0)
58        opt_param = ["country_code_file"]
59        self.unpack_userparams(opt_param_names=opt_param)
60        if self.enable_packet_log and hasattr(self, "packet_capture"):
61            self.packet_logger = self.packet_capture[0]
62            self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g)
63            self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g)
64        if hasattr(self, "android_devices"):
65            for ad in self.android_devices:
66                wutils.wifi_test_device_init(ad)
67                if hasattr(self, "country_code_file"):
68                    if isinstance(self.country_code_file, list):
69                        self.country_code_file = self.country_code_file[0]
70                    if not os.path.isfile(self.country_code_file):
71                        self.country_code_file = os.path.join(
72                            self.user_params[Config.key_config_path.value],
73                            self.country_code_file)
74                    self.country_code = utils.load_config(
75                        self.country_code_file)["country"]
76                else:
77                    self.country_code = WifiEnums.CountryCode.US
78                wutils.set_wifi_country_code(ad, self.country_code)
79
80                if hasattr(self, "flagged_features"):
81                    self._configure_flagged_features(ad, self.flagged_features)
82
83    def setup_test(self):
84        if (hasattr(self, "android_devices")):
85            wutils.start_all_wlan_logs(self.android_devices)
86        self.tcpdump_proc = []
87        if hasattr(self, "android_devices"):
88            for ad in self.android_devices:
89                proc = nutils.start_tcpdump(ad, self.test_name)
90                self.tcpdump_proc.append((ad, proc))
91        if hasattr(self, "packet_logger"):
92            self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual',
93                                                    self.test_name)
94
95    def teardown_test(self):
96        if (hasattr(self, "android_devices")):
97            wutils.stop_all_wlan_logs(self.android_devices)
98            for proc in self.tcpdump_proc:
99                nutils.stop_tcpdump(proc[0],
100                                    proc[1],
101                                    self.test_name,
102                                    pull_dump=False)
103            self.tcpdump_proc = []
104        if hasattr(self, "packet_logger") and self.packet_log_pid:
105            wutils.stop_pcap(self.packet_logger,
106                             self.packet_log_pid,
107                             test_status=True)
108            self.packet_log_pid = {}
109
110    def teardown_class(self):
111        begin_time = utils.get_current_epoch_time()
112        super().teardown_class()
113        for device in getattr(self, "fuchsia_devices", []):
114            device.take_bug_report(STAGE_NAME_TEARDOWN_CLASS, begin_time)
115
116    def on_fail(self, test_name, begin_time):
117        if hasattr(self, "android_devices"):
118            for ad in self.android_devices:
119                ad.take_bug_report(test_name, begin_time)
120                ad.cat_adb_log(test_name, begin_time)
121                wutils.get_ssrdumps(ad)
122            wutils.stop_all_wlan_logs(self.android_devices)
123            for ad in self.android_devices:
124                wutils.get_wlan_logs(ad)
125            for proc in self.tcpdump_proc:
126                nutils.stop_tcpdump(proc[0], proc[1], self.test_name)
127            self.tcpdump_proc = []
128        if hasattr(self, "packet_logger") and self.packet_log_pid:
129            wutils.stop_pcap(self.packet_logger,
130                             self.packet_log_pid,
131                             test_status=False)
132            self.packet_log_pid = {}
133
134        # Gets a wlan_device log and calls the generic device fail on DUT.
135        for device in getattr(self, "fuchsia_devices", []):
136            self.on_device_fail(device, test_name, begin_time)
137
138    def on_device_fail(self, device, test_name, begin_time):
139        """Gets a generic device DUT bug report.
140
141        This method takes a bug report if the device has the
142        'take_bug_report_on_fail' config value, and if the flag is true. This
143        method also power cycles if 'hard_reboot_on_fail' is True.
144
145        Args:
146            device: Generic device to gather logs from.
147            test_name: Name of the test that triggered this function.
148            begin_time: Logline format timestamp taken when the test started.
149        """
150        if (not hasattr(device, "take_bug_report_on_fail")
151                or device.take_bug_report_on_fail):
152            device.take_bug_report(test_name, begin_time)
153
154        if hasattr(device,
155                   "hard_reboot_on_fail") and device.hard_reboot_on_fail:
156            device.reboot(reboot_type='hard', testbed_pdus=self.pdu_devices)
157
158    def download_ap_logs(self):
159        """Downloads the DHCP and hostapad logs from the access_point.
160
161        Using the current TestClassContext and TestCaseContext this method pulls
162        the DHCP and hostapd logs and outputs them to the correct path.
163        """
164        current_path = context.get_current_context().get_full_output_path()
165        dhcp_full_out_path = os.path.join(current_path, "dhcp_log.txt")
166
167        dhcp_log = self.access_point.get_dhcp_logs()
168        if dhcp_log:
169            dhcp_log_file = open(dhcp_full_out_path, 'w')
170            dhcp_log_file.write(dhcp_log)
171            dhcp_log_file.close()
172
173        hostapd_logs = self.access_point.get_hostapd_logs()
174        for interface in hostapd_logs:
175            out_name = interface + "_hostapd_log.txt"
176            hostapd_full_out_path = os.path.join(current_path, out_name)
177            hostapd_log_file = open(hostapd_full_out_path, 'w')
178            hostapd_log_file.write(hostapd_logs[interface])
179            hostapd_log_file.close()
180
181    def get_psk_network(
182            self,
183            mirror_ap,
184            reference_networks,
185            hidden=False,
186            same_ssid=False,
187            security_mode=hostapd_constants.WPA2_STRING,
188            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
189            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
190            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
191            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
192        """Generates SSID and passphrase for a WPA2 network using random
193           generator.
194
195           Args:
196               mirror_ap: Boolean, determines if both APs use the same hostapd
197                          config or different configs.
198               reference_networks: List of PSK networks.
199               same_ssid: Boolean, determines if both bands on AP use the same
200                          SSID.
201               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
202               ssid_length_5g: Int, number of characters to use for 5G SSID.
203               passphrase_length_2g: Int, length of password for 2G network.
204               passphrase_length_5g: Int, length of password for 5G network.
205
206           Returns: A dict of 2G and 5G network lists for hostapd configuration.
207
208        """
209        network_dict_2g = {}
210        network_dict_5g = {}
211        ref_5g_security = security_mode
212        ref_2g_security = security_mode
213
214        if same_ssid:
215            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
216            ref_5g_ssid = ref_2g_ssid
217
218            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
219            ref_5g_passphrase = ref_2g_passphrase
220
221        else:
222            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
223            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
224
225            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
226            ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
227
228        network_dict_2g = {
229            "SSID": ref_2g_ssid,
230            "security": ref_2g_security,
231            "password": ref_2g_passphrase,
232            "hiddenSSID": hidden
233        }
234
235        network_dict_5g = {
236            "SSID": ref_5g_ssid,
237            "security": ref_5g_security,
238            "password": ref_5g_passphrase,
239            "hiddenSSID": hidden
240        }
241
242        ap = 0
243        for ap in range(MAX_AP_COUNT):
244            reference_networks.append({
245                "2g": copy.copy(network_dict_2g),
246                "5g": copy.copy(network_dict_5g)
247            })
248            if not mirror_ap:
249                break
250        return {"2g": network_dict_2g, "5g": network_dict_5g}
251
252    def get_open_network(self,
253                         mirror_ap,
254                         open_network,
255                         hidden=False,
256                         same_ssid=False,
257                         ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
258                         ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
259                         security_mode='none'):
260        """Generates SSIDs for a open network using a random generator.
261
262        Args:
263            mirror_ap: Boolean, determines if both APs use the same hostapd
264                       config or different configs.
265            open_network: List of open networks.
266            same_ssid: Boolean, determines if both bands on AP use the same
267                       SSID.
268            ssid_length_2g: Int, number of characters to use for 2G SSID.
269            ssid_length_5g: Int, number of characters to use for 5G SSID.
270            security_mode: 'none' for open and 'OWE' for WPA3 OWE.
271
272        Returns: A dict of 2G and 5G network lists for hostapd configuration.
273
274        """
275        network_dict_2g = {}
276        network_dict_5g = {}
277
278        if same_ssid:
279            open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
280            open_5g_ssid = open_2g_ssid
281
282        else:
283            open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
284            open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
285
286        network_dict_2g = {
287            "SSID": open_2g_ssid,
288            "security": security_mode,
289            "hiddenSSID": hidden
290        }
291
292        network_dict_5g = {
293            "SSID": open_5g_ssid,
294            "security": security_mode,
295            "hiddenSSID": hidden
296        }
297
298        ap = 0
299        for ap in range(MAX_AP_COUNT):
300            open_network.append({
301                "2g": copy.copy(network_dict_2g),
302                "5g": copy.copy(network_dict_5g)
303            })
304            if not mirror_ap:
305                break
306        return {"2g": network_dict_2g, "5g": network_dict_5g}
307
308    def get_wep_network(
309            self,
310            mirror_ap,
311            networks,
312            hidden=False,
313            same_ssid=False,
314            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
315            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
316            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
317            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
318        """Generates SSID and passphrase for a WEP network using random
319           generator.
320
321           Args:
322               mirror_ap: Boolean, determines if both APs use the same hostapd
323                          config or different configs.
324               networks: List of WEP networks.
325               same_ssid: Boolean, determines if both bands on AP use the same
326                          SSID.
327               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
328               ssid_length_5g: Int, number of characters to use for 5G SSID.
329               passphrase_length_2g: Int, length of password for 2G network.
330               passphrase_length_5g: Int, length of password for 5G network.
331
332           Returns: A dict of 2G and 5G network lists for hostapd configuration.
333
334        """
335        network_dict_2g = {}
336        network_dict_5g = {}
337        ref_5g_security = hostapd_constants.WEP_STRING
338        ref_2g_security = hostapd_constants.WEP_STRING
339
340        if same_ssid:
341            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
342            ref_5g_ssid = ref_2g_ssid
343
344            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
345            ref_5g_passphrase = ref_2g_passphrase
346
347        else:
348            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
349            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
350
351            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
352            ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g)
353
354        network_dict_2g = {
355            "SSID": ref_2g_ssid,
356            "security": ref_2g_security,
357            "wepKeys": [ref_2g_passphrase] * 4,
358            "hiddenSSID": hidden
359        }
360
361        network_dict_5g = {
362            "SSID": ref_5g_ssid,
363            "security": ref_5g_security,
364            "wepKeys": [ref_2g_passphrase] * 4,
365            "hiddenSSID": hidden
366        }
367
368        ap = 0
369        for ap in range(MAX_AP_COUNT):
370            networks.append({
371                "2g": copy.copy(network_dict_2g),
372                "5g": copy.copy(network_dict_5g)
373            })
374            if not mirror_ap:
375                break
376        return {"2g": network_dict_2g, "5g": network_dict_5g}
377
378    def update_bssid(self, ap_instance, ap, network, band):
379        """Get bssid and update network dictionary.
380
381        Args:
382            ap_instance: Accesspoint index that was configured.
383            ap: Accesspoint object corresponding to ap_instance.
384            network: Network dictionary.
385            band: Wifi networks' band.
386
387        """
388        bssid = ap.get_bssid_from_ssid(network["SSID"], band)
389
390        if network["security"] == hostapd_constants.WPA2_STRING:
391            # TODO:(bamahadev) Change all occurances of reference_networks
392            # to wpa_networks.
393            self.reference_networks[ap_instance][band]["bssid"] = bssid
394        if network["security"] == hostapd_constants.WPA_STRING:
395            self.wpa_networks[ap_instance][band]["bssid"] = bssid
396        if network["security"] == hostapd_constants.WEP_STRING:
397            self.wep_networks[ap_instance][band]["bssid"] = bssid
398        if network["security"] == hostapd_constants.ENT_STRING:
399            if "bssid" not in self.ent_networks[ap_instance][band]:
400                self.ent_networks[ap_instance][band]["bssid"] = bssid
401            else:
402                self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid
403        if network["security"] == 'none':
404            self.open_network[ap_instance][band]["bssid"] = bssid
405
406    def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
407        """Get bssid for a given SSID and add it to the network dictionary.
408
409        Args:
410            ap_instance: Accesspoint index that was configured.
411            ap: Accesspoint object corresponding to ap_instance.
412            networks_5g: List of 5g networks configured on the APs.
413            networks_2g: List of 2g networks configured on the APs.
414
415        """
416
417        if not (networks_5g or networks_2g):
418            return
419
420        for network in networks_5g:
421            if 'channel' in network:
422                continue
423            self.update_bssid(ap_instance, ap, network,
424                              hostapd_constants.BAND_5G)
425
426        for network in networks_2g:
427            if 'channel' in network:
428                continue
429            self.update_bssid(ap_instance, ap, network,
430                              hostapd_constants.BAND_2G)
431
432    def configure_openwrt_ap_and_start(
433            self,
434            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
435            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
436            channel_5g_ap2=None,
437            channel_2g_ap2=None,
438            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
439            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
440            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
441            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
442            mirror_ap=False,
443            hidden=False,
444            same_ssid=False,
445            open_network=False,
446            wpa1_network=False,
447            wpa_network=False,
448            wep_network=False,
449            ent_network=False,
450            ent_network_pwd=False,
451            owe_network=False,
452            sae_network=False,
453            saemixed_network=False,
454            radius_conf_2g=None,
455            radius_conf_5g=None,
456            radius_conf_pwd=None,
457            ap_count=1,
458            ieee80211w=None):
459        """Create, configure and start OpenWrt AP.
460
461        Args:
462            channel_5g: 5G channel to configure.
463            channel_2g: 2G channel to configure.
464            channel_5g_ap2: 5G channel to configure on AP2.
465            channel_2g_ap2: 2G channel to configure on AP2.
466            ssid_length_2g: Int, number of characters to use for 2G SSID.
467            passphrase_length_2g: Int, length of password for 2G network.
468            ssid_length_5g: Int, number of characters to use for 5G SSID.
469            passphrase_length_5g: Int, length of password for 5G network.
470            same_ssid: Boolean, determines if both bands on AP use the same SSID.
471            open_network: Boolean, to check if open network should be configured.
472            wpa_network: Boolean, to check if wpa network should be configured.
473            wep_network: Boolean, to check if wep network should be configured.
474            ent_network: Boolean, to check if ent network should be configured.
475            ent_network_pwd: Boolean, to check if ent pwd network should be configured.
476            owe_network: Boolean, to check if owe network should be configured.
477            sae_network: Boolean, to check if sae network should be configured.
478            saemixed_network: Boolean, to check if saemixed network should be configured.
479            radius_conf_2g: dictionary with enterprise radius server details.
480            radius_conf_5g: dictionary with enterprise radius server details.
481            radius_conf_pwd: dictionary with enterprise radiuse server details.
482            ap_count: APs to configure.
483            ieee80211w:PMF to configure
484        """
485        if mirror_ap and ap_count == 1:
486            raise ValueError("ap_count cannot be 1 if mirror_ap is True.")
487        if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1:
488            raise ValueError(
489                "ap_count cannot be 1 if channels of AP2 are provided.")
490        # we are creating a channel list for 2G and 5G bands. The list is of
491        # size 2 and this is based on the assumption that each testbed will have
492        # at most 2 APs.
493        if not channel_5g_ap2:
494            channel_5g_ap2 = channel_5g
495        if not channel_2g_ap2:
496            channel_2g_ap2 = channel_2g
497        channels_2g = [channel_2g, channel_2g_ap2]
498        channels_5g = [channel_5g, channel_5g_ap2]
499
500        self.reference_networks = []
501        self.wpa1_networks = []
502        self.wpa_networks = []
503        self.wep_networks = []
504        self.ent_networks = []
505        self.ent_networks_pwd = []
506        self.open_network = []
507        self.owe_networks = []
508        self.sae_networks = []
509        self.saemixed_networks = []
510        self.bssid_map = []
511        for i in range(ap_count):
512            network_list = []
513            if wpa1_network:
514                wpa1_dict = self.get_psk_network(mirror_ap, self.wpa1_networks,
515                                                 hidden, same_ssid,
516                                                 ssid_length_2g,
517                                                 ssid_length_5g,
518                                                 passphrase_length_2g,
519                                                 passphrase_length_5g)
520                wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk"
521                wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk"
522                wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
523                wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
524                self.wpa1_networks.append(wpa1_dict)
525                network_list.append(wpa1_dict)
526            if wpa_network:
527                wpa_dict = self.get_psk_network(mirror_ap,
528                                                self.reference_networks,
529                                                hidden, same_ssid,
530                                                ssid_length_2g, ssid_length_5g,
531                                                passphrase_length_2g,
532                                                passphrase_length_5g)
533                wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2"
534                wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2"
535                wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
536                wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
537                self.wpa_networks.append(wpa_dict)
538                network_list.append(wpa_dict)
539            if wep_network:
540                wep_dict = self.get_wep_network(mirror_ap, self.wep_networks,
541                                                hidden, same_ssid,
542                                                ssid_length_2g, ssid_length_5g)
543                network_list.append(wep_dict)
544            if ent_network:
545                ent_dict = self.get_open_network(mirror_ap, self.ent_networks,
546                                                 hidden, same_ssid,
547                                                 ssid_length_2g,
548                                                 ssid_length_5g)
549                ent_dict["2g"]["security"] = "wpa2"
550                ent_dict["2g"].update(radius_conf_2g)
551                ent_dict["5g"]["security"] = "wpa2"
552                ent_dict["5g"].update(radius_conf_5g)
553                network_list.append(ent_dict)
554            if ent_network_pwd:
555                ent_pwd_dict = self.get_open_network(mirror_ap,
556                                                     self.ent_networks_pwd,
557                                                     hidden, same_ssid,
558                                                     ssid_length_2g,
559                                                     ssid_length_5g)
560                ent_pwd_dict["2g"]["security"] = "wpa2"
561                ent_pwd_dict["2g"].update(radius_conf_pwd)
562                ent_pwd_dict["5g"]["security"] = "wpa2"
563                ent_pwd_dict["5g"].update(radius_conf_pwd)
564                network_list.append(ent_pwd_dict)
565            if open_network:
566                open_dict = self.get_open_network(mirror_ap, self.open_network,
567                                                  hidden, same_ssid,
568                                                  ssid_length_2g,
569                                                  ssid_length_5g)
570                network_list.append(open_dict)
571            if owe_network:
572                owe_dict = self.get_open_network(mirror_ap, self.owe_networks,
573                                                 hidden, same_ssid,
574                                                 ssid_length_2g,
575                                                 ssid_length_5g, "OWE")
576                owe_dict[hostapd_constants.BAND_2G]["security"] = "owe"
577                owe_dict[hostapd_constants.BAND_5G]["security"] = "owe"
578                network_list.append(owe_dict)
579            if sae_network:
580                sae_dict = self.get_psk_network(mirror_ap, self.sae_networks,
581                                                hidden, same_ssid,
582                                                hostapd_constants.SAE_KEY_MGMT,
583                                                ssid_length_2g, ssid_length_5g,
584                                                passphrase_length_2g,
585                                                passphrase_length_5g)
586                sae_dict[hostapd_constants.BAND_2G]["security"] = "sae"
587                sae_dict[hostapd_constants.BAND_5G]["security"] = "sae"
588                network_list.append(sae_dict)
589            if saemixed_network:
590                saemixed_dict = self.get_psk_network(
591                    mirror_ap, self.saemixed_networks, hidden, same_ssid,
592                    hostapd_constants.SAE_KEY_MGMT, ssid_length_2g,
593                    ssid_length_5g, passphrase_length_2g, passphrase_length_5g)
594                saemixed_dict[
595                    hostapd_constants.BAND_2G]["security"] = "sae-mixed"
596                saemixed_dict[
597                    hostapd_constants.BAND_5G]["security"] = "sae-mixed"
598                saemixed_dict[
599                    hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
600                saemixed_dict[
601                    hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
602                network_list.append(saemixed_dict)
603            self.access_points[i].configure_ap(network_list, channels_2g[i],
604                                               channels_5g[i])
605            self.access_points[i].start_ap()
606            self.bssid_map.append(
607                self.access_points[i].get_bssids_for_wifi_networks())
608            if mirror_ap:
609                self.access_points[i + 1].configure_ap(network_list,
610                                                       channels_2g[i + 1],
611                                                       channels_5g[i + 1])
612                self.access_points[i + 1].start_ap()
613                self.bssid_map.append(
614                    self.access_points[i + 1].get_bssids_for_wifi_networks())
615                break
616
617    def legacy_configure_ap_and_start(
618            self,
619            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
620            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
621            max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G,
622            max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G,
623            ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
624            ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
625            ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
626            ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
627            hidden=False,
628            same_ssid=False,
629            mirror_ap=True,
630            wpa_network=False,
631            wep_network=False,
632            ent_network=False,
633            radius_conf_2g=None,
634            radius_conf_5g=None,
635            ent_network_pwd=False,
636            radius_conf_pwd=None,
637            ap_count=1):
638
639        config_count = 1
640        count = 0
641
642        # For example, the NetworkSelector tests use 2 APs and require that
643        # both APs are not mirrored.
644        if not mirror_ap and ap_count == 1:
645            raise ValueError("ap_count cannot be 1 if mirror_ap is False.")
646
647        if not mirror_ap:
648            config_count = ap_count
649
650        self.user_params["reference_networks"] = []
651        self.user_params["open_network"] = []
652        if wpa_network:
653            self.user_params["wpa_networks"] = []
654        if wep_network:
655            self.user_params["wep_networks"] = []
656        if ent_network:
657            self.user_params["ent_networks"] = []
658        if ent_network_pwd:
659            self.user_params["ent_networks_pwd"] = []
660
661        # kill hostapd & dhcpd if the cleanup was not successful
662        for i in range(len(self.access_points)):
663            self.log.debug("Check ap state and cleanup")
664            self._cleanup_hostapd_and_dhcpd(i)
665
666        for count in range(config_count):
667
668            network_list_2g = []
669            network_list_5g = []
670
671            orig_network_list_2g = []
672            orig_network_list_5g = []
673
674            network_list_2g.append({"channel": channel_2g})
675            network_list_5g.append({"channel": channel_5g})
676
677            networks_dict = self.get_psk_network(
678                mirror_ap,
679                self.user_params["reference_networks"],
680                hidden=hidden,
681                same_ssid=same_ssid)
682            self.reference_networks = self.user_params["reference_networks"]
683
684            network_list_2g.append(networks_dict["2g"])
685            network_list_5g.append(networks_dict["5g"])
686
687            # When same_ssid is set, only configure one set of WPA networks.
688            # We cannot have more than one set because duplicate interface names
689            # are not allowed.
690            # TODO(bmahadev): Provide option to select the type of network,
691            # instead of defaulting to WPA.
692            if not same_ssid:
693                networks_dict = self.get_open_network(
694                    mirror_ap,
695                    self.user_params["open_network"],
696                    hidden=hidden,
697                    same_ssid=same_ssid)
698                self.open_network = self.user_params["open_network"]
699
700                network_list_2g.append(networks_dict["2g"])
701                network_list_5g.append(networks_dict["5g"])
702
703                if wpa_network:
704                    networks_dict = self.get_psk_network(
705                        mirror_ap,
706                        self.user_params["wpa_networks"],
707                        hidden=hidden,
708                        same_ssid=same_ssid,
709                        security_mode=hostapd_constants.WPA_STRING)
710                    self.wpa_networks = self.user_params["wpa_networks"]
711
712                    network_list_2g.append(networks_dict["2g"])
713                    network_list_5g.append(networks_dict["5g"])
714
715                if wep_network:
716                    networks_dict = self.get_wep_network(
717                        mirror_ap,
718                        self.user_params["wep_networks"],
719                        hidden=hidden,
720                        same_ssid=same_ssid)
721                    self.wep_networks = self.user_params["wep_networks"]
722
723                    network_list_2g.append(networks_dict["2g"])
724                    network_list_5g.append(networks_dict["5g"])
725
726                if ent_network:
727                    networks_dict = self.get_open_network(
728                        mirror_ap,
729                        self.user_params["ent_networks"],
730                        hidden=hidden,
731                        same_ssid=same_ssid)
732                    networks_dict["2g"][
733                        "security"] = hostapd_constants.ENT_STRING
734                    networks_dict["2g"].update(radius_conf_2g)
735                    networks_dict["5g"][
736                        "security"] = hostapd_constants.ENT_STRING
737                    networks_dict["5g"].update(radius_conf_5g)
738                    self.ent_networks = self.user_params["ent_networks"]
739
740                    network_list_2g.append(networks_dict["2g"])
741                    network_list_5g.append(networks_dict["5g"])
742
743                if ent_network_pwd:
744                    networks_dict = self.get_open_network(
745                        mirror_ap,
746                        self.user_params["ent_networks_pwd"],
747                        hidden=hidden,
748                        same_ssid=same_ssid)
749                    networks_dict["2g"][
750                        "security"] = hostapd_constants.ENT_STRING
751                    networks_dict["2g"].update(radius_conf_pwd)
752                    networks_dict["5g"][
753                        "security"] = hostapd_constants.ENT_STRING
754                    networks_dict["5g"].update(radius_conf_pwd)
755                    self.ent_networks_pwd = self.user_params[
756                        "ent_networks_pwd"]
757
758                    network_list_2g.append(networks_dict["2g"])
759                    network_list_5g.append(networks_dict["5g"])
760
761            orig_network_list_5g = copy.copy(network_list_5g)
762            orig_network_list_2g = copy.copy(network_list_2g)
763
764            if len(network_list_5g) > 1:
765                self.config_5g = self._generate_legacy_ap_config(
766                    network_list_5g)
767            if len(network_list_2g) > 1:
768                self.config_2g = self._generate_legacy_ap_config(
769                    network_list_2g)
770
771            self.access_points[count].start_ap(self.config_2g)
772            self.access_points[count].start_ap(self.config_5g)
773            self.populate_bssid(count, self.access_points[count],
774                                orig_network_list_5g, orig_network_list_2g)
775
776        # Repeat configuration on the second router.
777        if mirror_ap and ap_count == 2:
778            self.access_points[AP_2].start_ap(self.config_2g)
779            self.access_points[AP_2].start_ap(self.config_5g)
780            self.populate_bssid(AP_2, self.access_points[AP_2],
781                                orig_network_list_5g, orig_network_list_2g)
782
783    def _kill_processes(self, ap, daemon):
784        """ Kill hostapd and dhcpd daemons
785
786        Args:
787            ap: AP to cleanup
788            daemon: process to kill
789
790        Returns: True/False if killing process is successful
791        """
792        self.log.info("Killing %s" % daemon)
793        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
794        if pids.stdout:
795            ap.ssh.run('kill %s' % pids.stdout, ignore_status=True)
796        time.sleep(3)
797        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
798        if pids.stdout:
799            return False
800        return True
801
802    def _cleanup_hostapd_and_dhcpd(self, count):
803        """ Check if AP was cleaned up properly
804
805        Kill hostapd and dhcpd processes if cleanup was not successful in the
806        last run
807
808        Args:
809            count: AP to check
810
811        Returns:
812            New AccessPoint object if AP required cleanup
813
814        Raises:
815            Error: if the AccessPoint timed out to setup
816        """
817        ap = self.access_points[count]
818        phy_ifaces = ap.interfaces.get_physical_interface()
819        kill_hostapd = False
820        for iface in phy_ifaces:
821            if '2g_' in iface or '5g_' in iface or 'xg_' in iface:
822                kill_hostapd = True
823                break
824
825        if not kill_hostapd:
826            return
827
828        self.log.debug("Cleanup AP")
829        if not self._kill_processes(ap, 'hostapd') or \
830            not self._kill_processes(ap, 'dhcpd'):
831            raise ("Failed to cleanup AP")
832
833        ap.__init__(self.user_params['AccessPoint'][count])
834
835    def _generate_legacy_ap_config(self, network_list):
836        bss_settings = []
837        wlan_2g = self.access_points[AP_1].wlan_2g
838        wlan_5g = self.access_points[AP_1].wlan_5g
839        ap_settings = network_list.pop(0)
840        # TODO:(bmahadev) This is a bug. We should not have to pop the first
841        # network in the list and treat it as a separate case. Instead,
842        # create_ap_preset() should be able to take NULL ssid and security and
843        # build config based on the bss_Settings alone.
844        hostapd_config_settings = network_list.pop(0)
845        for network in network_list:
846            if "password" in network:
847                bss_settings.append(
848                    hostapd_bss_settings.BssSettings(
849                        name=network["SSID"],
850                        ssid=network["SSID"],
851                        hidden=network["hiddenSSID"],
852                        security=hostapd_security.Security(
853                            security_mode=network["security"],
854                            password=network["password"])))
855            elif "wepKeys" in network:
856                bss_settings.append(
857                    hostapd_bss_settings.BssSettings(
858                        name=network["SSID"],
859                        ssid=network["SSID"],
860                        hidden=network["hiddenSSID"],
861                        security=hostapd_security.Security(
862                            security_mode=network["security"],
863                            password=network["wepKeys"][0])))
864            elif network["security"] == hostapd_constants.ENT_STRING:
865                bss_settings.append(
866                    hostapd_bss_settings.BssSettings(
867                        name=network["SSID"],
868                        ssid=network["SSID"],
869                        hidden=network["hiddenSSID"],
870                        security=hostapd_security.Security(
871                            security_mode=network["security"],
872                            radius_server_ip=network["radius_server_ip"],
873                            radius_server_port=network["radius_server_port"],
874                            radius_server_secret=network[
875                                "radius_server_secret"])))
876            else:
877                bss_settings.append(
878                    hostapd_bss_settings.BssSettings(
879                        name=network["SSID"],
880                        ssid=network["SSID"],
881                        hidden=network["hiddenSSID"]))
882        if "password" in hostapd_config_settings:
883            config = hostapd_ap_preset.create_ap_preset(
884                iface_wlan_2g=wlan_2g,
885                iface_wlan_5g=wlan_5g,
886                channel=ap_settings["channel"],
887                ssid=hostapd_config_settings["SSID"],
888                hidden=hostapd_config_settings["hiddenSSID"],
889                security=hostapd_security.Security(
890                    security_mode=hostapd_config_settings["security"],
891                    password=hostapd_config_settings["password"]),
892                bss_settings=bss_settings)
893        elif "wepKeys" in hostapd_config_settings:
894            config = hostapd_ap_preset.create_ap_preset(
895                iface_wlan_2g=wlan_2g,
896                iface_wlan_5g=wlan_5g,
897                channel=ap_settings["channel"],
898                ssid=hostapd_config_settings["SSID"],
899                hidden=hostapd_config_settings["hiddenSSID"],
900                security=hostapd_security.Security(
901                    security_mode=hostapd_config_settings["security"],
902                    password=hostapd_config_settings["wepKeys"][0]),
903                bss_settings=bss_settings)
904        else:
905            config = hostapd_ap_preset.create_ap_preset(
906                iface_wlan_2g=wlan_2g,
907                iface_wlan_5g=wlan_5g,
908                channel=ap_settings["channel"],
909                ssid=hostapd_config_settings["SSID"],
910                hidden=hostapd_config_settings["hiddenSSID"],
911                bss_settings=bss_settings)
912        return config
913
914    def configure_packet_capture(
915            self,
916            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
917            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G):
918        """Configure packet capture for 2G and 5G bands.
919
920        Args:
921            channel_5g: Channel to set the monitor mode to for 5G band.
922            channel_2g: Channel to set the monitor mode to for 2G band.
923        """
924        self.packet_capture = self.packet_capture[0]
925        result = self.packet_capture.configure_monitor_mode(
926            hostapd_constants.BAND_2G, channel_2g)
927        if not result:
928            raise ValueError("Failed to configure channel for 2G band")
929
930        result = self.packet_capture.configure_monitor_mode(
931            hostapd_constants.BAND_5G, channel_5g)
932        if not result:
933            raise ValueError("Failed to configure channel for 5G band.")
934
935    @staticmethod
936    def wifi_test_wrap(fn):
937        def _safe_wrap_test_case(self, *args, **kwargs):
938            test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name,
939                                    self.log_begin_time.replace(' ', '-'))
940            self.test_id = test_id
941            self.result_detail = ""
942            tries = int(self.user_params.get("wifi_auto_rerun", 3))
943            for ad in self.android_devices:
944                ad.log_path = self.log_path
945            for i in range(tries + 1):
946                result = True
947                if i > 0:
948                    log_string = "[Test Case] RETRY:%s %s" % (i,
949                                                              self.test_name)
950                    self.log.info(log_string)
951                    self._teardown_test(self.test_name)
952                    self._setup_test(self.test_name)
953                try:
954                    result = fn(self, *args, **kwargs)
955                except signals.TestFailure as e:
956                    self.log.warn("Error msg: %s" % e)
957                    if self.result_detail:
958                        signal.details = self.result_detail
959                    result = False
960                except signals.TestSignal:
961                    if self.result_detail:
962                        signal.details = self.result_detail
963                    raise
964                except Exception as e:
965                    self.log.exception(e)
966                    asserts.fail(self.result_detail)
967                if result is False:
968                    if i < tries:
969                        continue
970                else:
971                    break
972            if result is not False:
973                asserts.explicit_pass(self.result_detail)
974            else:
975                asserts.fail(self.result_detail)
976
977        return _safe_wrap_test_case
978
979    def _configure_flagged_features(self, ad, flagged_features):
980        for module, features in flagged_features.items():
981            for feature in features:
982                value = flagged_features[module][feature].lower()
983                if value not in ("true", "false"):
984                    raise ValueError("Invalid flag value for %s %s." % (module, feature))
985                self.log.info("Setting feature flag %s %s to %s." % (module, feature, value))
986                adb_put_command = "device_config put %s %s %s" % (module, feature, value)
987                adb_get_command = "device_config get %s %s" % (module, feature)
988                ad.adb.shell(adb_put_command)
989                value_from_get = ad.adb.shell(adb_get_command)
990
991                if type(value_from_get) != str or value_from_get.lower() != value:
992                    raise RuntimeError("Failed to set flag value to %s (now is %s) for %s %s." % (value, value_from_get, module, feature))