#!/usr/bin/env python3.4
#
#   Copyright 2018 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import re
import random
import time

import acts.controllers.packet_capture as packet_capture
import acts.signals as signals
import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils
import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils
import acts_contrib.test_utils.wifi.wifi_test_utils as wutils

from acts import asserts
from acts.base_test import BaseTestClass
from acts.controllers.ap_lib import hostapd_constants
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest

WifiEnums = wutils.WifiEnums

WAIT_BEFORE_CONNECTION = 1
SINGLE_BAND = 1
DUAL_BAND = 2

TIMEOUT = 60
TEST = 'test_'
PING_ADDR = 'www.google.com'

NUM_LINK_PROBES = 3
PROBE_DELAY_SEC = 1


class WifiChaosTest(WifiBaseTest):
    """ Tests for wifi IOT

        Test Bed Requirement:
          * One Android device
          * Wi-Fi IOT networks visible to the device
    """

    def __init__(self, configs):
        BaseTestClass.__init__(self, configs)
        self.generate_interop_tests()

    def randomize_testcases(self):
        """Randomize the list of hosts and build a random order of tests,
           based on SSIDs, keeping all the relevant bands of an AP together.

        """
        temp_tests = list()
        hosts = self.user_params['interop_host']

        random.shuffle(hosts)

        for host in hosts:
            ssid_2g = None
            ssid_5g = None
            info = dutils.show_device(host)

            # Based on the information in datastore, add to test list if
            # AP has 2G band.
            if 'ssid_2g' in info:
                ssid_2g = info['ssid_2g']
                temp_tests.append(TEST + ssid_2g)

            # Based on the information in datastore, add to test list if
            # AP has 5G band.
            if 'ssid_5g' in info:
                ssid_5g = info['ssid_5g']
                temp_tests.append(TEST + ssid_5g)

        self.tests = temp_tests

    def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
        """Generates a single test case from the given data.

        Args:
            base_test: The base test case function to run.
            testcase_name: The name of the test case.
            ssid_dict: The information about the network under test.
        """
        ssid = testcase_name
        test_tracker_uuid = ssid_dict[testcase_name]['uuid']
        hostname = ssid_dict[testcase_name]['host']
        if not testcase_name.startswith('test_'):
            testcase_name = 'test_%s' % testcase_name
        test_case = test_tracker_info(
            uuid=test_tracker_uuid)(lambda: base_test(ssid, hostname))
        setattr(self, testcase_name, test_case)
        self.tests.append(testcase_name)

    def generate_interop_tests(self):
        for ssid_dict in self.user_params['interop_ssid']:
            testcase_name = list(ssid_dict)[0]
            self.generate_interop_testcase(self.interop_base_test,
                                           testcase_name, ssid_dict)
        self.randomize_testcases()

    def setup_class(self):
        super().setup_class()
        self.dut = self.android_devices[0]
        self.admin = 'admin' + str(random.randint(1000001, 12345678))
        wutils.wifi_test_device_init(self.dut)
        # Set country code explicitly to "US".
        wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)

        asserts.assert_true(
            self.lock_pcap(),
            "Could not lock a Packet Capture. Aborting Interop test.")

        wutils.wifi_toggle_state(self.dut, True)

    def lock_pcap(self):
        """Lock a Packet Capturere to use for the test."""

        # Get list of devices from the datastore.
        locked_pcap = False
        devices = dutils.get_devices()

        for device in devices:

            device_name = device['hostname']
            device_type = device['ap_label']
            if device_type == 'PCAP' and not device['lock_status']:
                if dutils.lock_device(device_name, self.admin):
                    self.pcap_host = device_name
                    host = device['ip_address']
                    self.log.info("Locked Packet Capture device: %s" %
                                  device_name)
                    locked_pcap = True
                    break
                else:
                    self.log.warning("Failed to lock %s PCAP." % device_name)

        if not locked_pcap:
            return False

        pcap_config = {'ssh_config': {'user': 'root'}}
        pcap_config['ssh_config']['host'] = host

        self.pcap = packet_capture.PacketCapture(pcap_config)
        return True

    def setup_test(self):
        super().setup_test()
        self.dut.droid.wakeLockAcquireBright()
        self.dut.droid.wakeUpNow()

    def on_pass(self, test_name, begin_time):
        wutils.stop_pcap(self.pcap, self.pcap_procs, True)

    def on_fail(self, test_name, begin_time):
        # Sleep to ensure all failed packets are captured.
        time.sleep(5)
        wutils.stop_pcap(self.pcap, self.pcap_procs, False)
        super().on_fail(test_name, begin_time)

    def teardown_test(self):
        super().teardown_test()
        self.dut.droid.wakeLockRelease()
        self.dut.droid.goToSleepNow()

    def teardown_class(self):
        # Unlock the PCAP.
        if not dutils.unlock_device(self.pcap_host):
            self.log.warning("Failed to unlock %s PCAP. Check in datastore.")

    """Helper Functions"""

    def scan_and_connect_by_id(self, network, net_id):
        """Scan for network and connect using network id.

        Args:
            net_id: Integer specifying the network id of the network.

        """
        ssid = network[WifiEnums.SSID_KEY]
        wutils.start_wifi_connection_scan_and_ensure_network_found(
            self.dut, ssid)
        wutils.wifi_connect_by_id(self.dut, net_id)

    def run_ping(self, sec):
        """Run ping for given number of seconds.

        Args:
            sec: Time in seconds to run teh ping traffic.

        """
        self.log.info("Finding Gateway...")
        route_response = self.dut.adb.shell("ip route get 8.8.8.8")
        gateway_ip = re.search('via (.*) dev', str(route_response)).group(1)
        self.log.info("Gateway IP = %s" % gateway_ip)
        self.log.info("Running ping for %d seconds" % sec)
        result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip),
                                    timeout=sec + 1)
        self.log.debug("Ping Result = %s" % result)
        if "100% packet loss" in result:
            raise signals.TestFailure("100% packet loss during ping")

    def send_link_probes(self, network):
        """
        Send link probes, and verify that the device and AP did not crash.
        Also verify that at least one link probe succeeded.

        Steps:
        1. Send a few link probes.
        2. Ensure that the device and AP did not crash (by checking that the
           device remains connected to the expected network).
        """
        results = wutils.send_link_probes(self.dut, NUM_LINK_PROBES,
                                          PROBE_DELAY_SEC)

        self.log.info("Link Probe results: %s" % (results, ))

        wifi_info = self.dut.droid.wifiGetConnectionInfo()
        expected = network[WifiEnums.SSID_KEY]
        actual = wifi_info[WifiEnums.SSID_KEY]
        asserts.assert_equal(
            expected, actual,
            "Device did not remain connected after sending link probes!")

    def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
        """UNlock the AP in datastore and turn off the AP.

        Args:
            hostname: Hostname of the AP.
            rpm_port: Port number on the RPM for the AP.
            rpm_ip: RPM's IP address.

        """
        # Un-Lock AP in datastore.
        self.log.debug("Un-lock AP in datastore")
        if not dutils.unlock_device(hostname):
            self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
        # Turn OFF AP from the RPM port.
        rutils.turn_off_ap(rpm_port, rpm_ip)

    def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
                               release_ap):
        """Run connect/disconnect to a given network in loop.

           Args:
               network: Dict, network information.
               hostname: Hostanme of the AP to connect to.
               rpm_port: Port number on the RPM for the AP.
               rpm_ip: Port number on the RPM for the AP.
               release_ap: Flag to determine if we should turn off the AP yet.

           Raises: TestFailure if the network connection fails.

        """
        for attempt in range(5):
            try:
                begin_time = time.time()
                ssid = network[WifiEnums.SSID_KEY]
                net_id = self.dut.droid.wifiAddNetwork(network)
                asserts.assert_true(net_id != -1,
                                    "Add network %s failed" % network)
                self.log.info("Connecting to %s" % ssid)
                self.scan_and_connect_by_id(network, net_id)
                self.run_ping(10)
                # TODO(b/133369482): uncomment once bug is resolved
                # self.send_link_probes(network)
                wutils.wifi_forget_network(self.dut, ssid)
                time.sleep(WAIT_BEFORE_CONNECTION)
            except Exception as e:
                self.log.error("Connection to %s network failed on the %d "
                               "attempt with exception %s." %
                               (ssid, attempt, e))
                # TODO:(bmahadev) Uncomment after scan issue is fixed.
                # self.dut.take_bug_report(ssid, begin_time)
                # self.dut.cat_adb_log(ssid, begin_time)
                if release_ap:
                    self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
                raise signals.TestFailure("Failed to connect to %s" % ssid)

    def get_band_and_chan(self, ssid):
        """Get the band and channel information from SSID.

        Args:
            ssid: SSID of the network.

        """
        ssid_info = ssid.split('_')
        self.band = ssid_info[-1]
        for item in ssid_info:
            # Skip over the router model part.
            if 'ch' in item and item != ssid_info[0]:
                self.chan = re.search(r'(\d+)', item).group(0)
                return
        raise signals.TestFailure("Channel information not found in SSID.")

    def interop_base_test(self, ssid, hostname):
        """Base test for all the connect-disconnect interop tests.

        Args:
            ssid: string, SSID of the network to connect to.
            hostname: string, hostname of the AP.

        Steps:
            1. Lock AP in datstore.
            2. Turn on AP on the rpm switch.
            3. Run connect-disconnect in loop.
            4. Turn off AP on the rpm switch.
            5. Unlock AP in datastore.

        """
        network = {}
        network['password'] = 'password'
        network['SSID'] = ssid
        release_ap = False
        wutils.reset_wifi(self.dut)

        # Lock AP in datastore.
        self.log.info("Lock AP in datastore")

        ap_info = dutils.show_device(hostname)

        # If AP is locked by a different test admin, then we skip.
        if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
            raise signals.TestSkip("AP %s is locked, skipping test" % hostname)

        if not dutils.lock_device(hostname, self.admin):
            self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
                             " and try again.")
            raise signals.TestFailure("Failed to lock AP")

        band = SINGLE_BAND
        if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
            band = DUAL_BAND
        if (band == SINGLE_BAND) or (band == DUAL_BAND and '5G' in ssid):
            release_ap = True

        # Get AP RPM attributes and Turn ON AP.
        rpm_ip = ap_info['rpm_ip']
        rpm_port = ap_info['rpm_port']

        rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
        self.log.info("Finished turning ON AP.")
        # Experimental. Some APs take upto a min to come online.
        time.sleep(60)

        self.get_band_and_chan(ssid)
        self.pcap.configure_monitor_mode(self.band, self.chan)
        self.pcap_procs = wutils.start_pcap(self.pcap, self.band.lower(),
                                            self.test_name)
        self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
                                    release_ap)

        # Un-lock only if it's a single band AP or we are running the last band.
        if release_ap:
            self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)