1#
2#   Copyright 2021 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16import random
17
18from acts import asserts
19from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
20from acts.test_decorators import test_tracker_info
21from acts_contrib.test_utils.net import connectivity_const as cconst
22from acts_contrib.test_utils.net import connectivity_test_utils as cutils
23from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
24from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
25from scapy.all import rdpcap, DNSRR, DNSQR, IP, IPv6
26
27
28WLAN = "wlan0"
29PING_ADDR = "google.com"
30
31
32class DNSTest(WifiBaseTest):
33    """DNS related test for Android."""
34
35    def setup_class(self):
36        self.dut = self.android_devices[0]
37        wutils.wifi_test_device_init(self.dut)
38
39        req_params = []
40        opt_param = ["wifi_network", "configure_OpenWrt"]
41        self.unpack_userparams(
42            req_param_names=req_params, opt_param_names=opt_param)
43
44        asserts.assert_true(OPENWRT in self.user_params,
45                            "OpenWrtAP is not in testbed.")
46        self.openwrt = self.access_points[0]
47        if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
48            self.dut.log.info("Skip configure Wifi interface due to config setup.")
49        else:
50            self.configure_openwrt_ap_and_start(wpa_network=True)
51            self.wifi_network = self.openwrt.get_wifi_network()
52
53        asserts.assert_true(self.openwrt.verify_wifi_status(),
54                            "OpenWrt Wifi interface is not ready.")
55
56    def teardown_class(self):
57        """Reset wifi to make sure VPN tears down cleanly."""
58        wutils.reset_wifi(self.dut)
59
60    def teardown_test(self):
61        """Reset wifi to make sure VPN tears down cleanly."""
62        wutils.reset_wifi(self.dut)
63
64    def ping(self, addr, ignore_status=True, timeout=60):
65        """Start a ping from DUT and return ping result.
66
67        Args:
68            addr: Address to ping.
69            ignore_status: ignore non zero return.
70            timeout: cmd timeout.
71        Returns:
72            Boolean for ping result.
73        """
74        return "100%" not in self.dut.adb.shell("ping -c 1 %s" % addr,
75                                                ignore_status=ignore_status,
76                                                timeout=timeout)
77
78    def generate_query_qname(self):
79        """Return a random query name."""
80        return "%s-ds.metric.gstatic.com" % random.randint(0, 99999999)
81
82    def _block_dns_response_and_ping(self, test_qname):
83        """Block the DNS response and ping
84
85        Args:
86            test_qname: Address to ping
87        Returns:
88            Packets for the ping result
89        """
90        # Start tcpdump on OpenWrt
91        remote_pcap_path = \
92            self.openwrt.network_setting.start_tcpdump(self.test_name)
93        self.dut.log.info("Test query name = %s" % test_qname)
94        # Block the DNS response only before sending the DNS query
95        self.openwrt.network_setting.block_dns_response()
96        # Start send a query
97        self.ping(test_qname)
98        # Un-block the DNS response right after DNS query
99        self.openwrt.network_setting.unblock_dns_response()
100        local_pcap_path = self.openwrt.network_setting.stop_tcpdump(
101            remote_pcap_path, self.dut.device_log_path)
102        self.dut.log.info("pcap file path : %s" % local_pcap_path)
103        # Check DNSQR.qname in tcpdump to verify device retransmit the query
104        packets = rdpcap(local_pcap_path)
105        return packets
106
107    def _get_dnsqr_packets(self, packets, layer, qname):
108        """Filter the DNSQR packets with specific layer
109
110        Args:
111            packets: Packets that came from rdpcap function
112            layer: Keep the packets that contains this layer
113            qname: Keep the packets that related to this qname
114        Returns:
115            List of filtered packets
116        """
117        filtered_packets = []
118        for pkt in packets:
119            if not pkt.haslayer(DNSQR):
120                continue
121            if pkt[DNSQR].qname.decode().strip(".") != qname:
122                continue
123            if pkt.haslayer(layer):
124                filtered_packets.append(pkt)
125        return filtered_packets
126
127    @test_tracker_info(uuid="dd7b8c92-c0f4-4403-a0ae-57a703162d83")
128    def test_dns_query(self):
129        # Setup environment
130        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
131        # Start tcpdump on OpenWrt
132        remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
133        # Generate query name
134        test_qname = self.generate_query_qname()
135        self.dut.log.info("Test query name = %s" % test_qname)
136        # Start send a query
137        ping_result = self.ping(test_qname)
138        local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path,
139                                                                    self.dut.device_log_path)
140        # Check DNSRR.rrname in tcpdump to verify DNS response
141        packets = rdpcap(local_pcap_path)
142        self.dut.log.info("pcap file path : %s" % local_pcap_path)
143        pkt_count = 0
144        for pkt in packets:
145            if pkt.haslayer(DNSRR) and pkt[DNSRR].rrname.decode().strip(".") == test_qname:
146                pkt_count = pkt_count + 1
147        self.dut.log.info("DNS query response count : %s" % pkt_count)
148        if not ping_result:
149            asserts.assert_true(pkt_count > 0,
150                                "Did not find match standard query response in tcpdump.")
151        asserts.assert_true(ping_result, "Device ping fail.")
152
153    @test_tracker_info(uuid="cd20c6e7-9c2e-4286-b08e-c8e40e413da5")
154    def test_dns_query_retransmit(self):
155        # Setup environment
156        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
157        test_qname = self.generate_query_qname()
158        packets = self._block_dns_response_and_ping(test_qname)
159        pkts = self._get_dnsqr_packets(packets, IP, test_qname)
160        pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname)
161        self.dut.log.info("IPv4 DNS query count : %s" % len(pkts))
162        self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6))
163        asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2,
164                            "Did not find match standard query in tcpdump.")
165
166    @test_tracker_info(uuid="5f58775d-ee7b-4d2e-8e77-77d41e821415")
167    def test_private_dns_query_retransmit(self):
168        # set private DNS mode
169        cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_STRICT)
170
171        # Setup environment
172        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
173        test_qname = self.generate_query_qname()
174        packets = self._block_dns_response_and_ping(test_qname)
175        pkts = self._get_dnsqr_packets(packets, IP, test_qname)
176        pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname)
177        self.dut.log.info("IPv4 DNS query count : %s" % len(pkts))
178        self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6))
179        asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2,
180                            "Did not find match standard query in tcpdump.")
181
182