1# 2# Copyright 2021 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import random 17 18from acts import asserts 19from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT 20from acts.test_decorators import test_tracker_info 21from acts_contrib.test_utils.net import connectivity_const as cconst 22from acts_contrib.test_utils.net import connectivity_test_utils as cutils 23from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 24from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 25from scapy.all import rdpcap, DNSRR, DNSQR, IP, IPv6 26 27 28WLAN = "wlan0" 29PING_ADDR = "google.com" 30 31 32class DNSTest(WifiBaseTest): 33 """DNS related test for Android.""" 34 35 def setup_class(self): 36 self.dut = self.android_devices[0] 37 wutils.wifi_test_device_init(self.dut) 38 39 req_params = [] 40 opt_param = ["wifi_network", "configure_OpenWrt"] 41 self.unpack_userparams( 42 req_param_names=req_params, opt_param_names=opt_param) 43 44 asserts.assert_true(OPENWRT in self.user_params, 45 "OpenWrtAP is not in testbed.") 46 self.openwrt = self.access_points[0] 47 if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip": 48 self.dut.log.info("Skip configure Wifi interface due to config setup.") 49 else: 50 self.configure_openwrt_ap_and_start(wpa_network=True) 51 self.wifi_network = self.openwrt.get_wifi_network() 52 53 asserts.assert_true(self.openwrt.verify_wifi_status(), 54 "OpenWrt Wifi interface is not ready.") 55 56 def teardown_class(self): 57 """Reset wifi to make sure VPN tears down cleanly.""" 58 wutils.reset_wifi(self.dut) 59 60 def teardown_test(self): 61 """Reset wifi to make sure VPN tears down cleanly.""" 62 wutils.reset_wifi(self.dut) 63 64 def ping(self, addr, ignore_status=True, timeout=60): 65 """Start a ping from DUT and return ping result. 66 67 Args: 68 addr: Address to ping. 69 ignore_status: ignore non zero return. 70 timeout: cmd timeout. 71 Returns: 72 Boolean for ping result. 73 """ 74 return "100%" not in self.dut.adb.shell("ping -c 1 %s" % addr, 75 ignore_status=ignore_status, 76 timeout=timeout) 77 78 def generate_query_qname(self): 79 """Return a random query name.""" 80 return "%s-ds.metric.gstatic.com" % random.randint(0, 99999999) 81 82 def _block_dns_response_and_ping(self, test_qname): 83 """Block the DNS response and ping 84 85 Args: 86 test_qname: Address to ping 87 Returns: 88 Packets for the ping result 89 """ 90 # Start tcpdump on OpenWrt 91 remote_pcap_path = \ 92 self.openwrt.network_setting.start_tcpdump(self.test_name) 93 self.dut.log.info("Test query name = %s" % test_qname) 94 # Block the DNS response only before sending the DNS query 95 self.openwrt.network_setting.block_dns_response() 96 # Start send a query 97 self.ping(test_qname) 98 # Un-block the DNS response right after DNS query 99 self.openwrt.network_setting.unblock_dns_response() 100 local_pcap_path = self.openwrt.network_setting.stop_tcpdump( 101 remote_pcap_path, self.dut.device_log_path) 102 self.dut.log.info("pcap file path : %s" % local_pcap_path) 103 # Check DNSQR.qname in tcpdump to verify device retransmit the query 104 packets = rdpcap(local_pcap_path) 105 return packets 106 107 def _get_dnsqr_packets(self, packets, layer, qname): 108 """Filter the DNSQR packets with specific layer 109 110 Args: 111 packets: Packets that came from rdpcap function 112 layer: Keep the packets that contains this layer 113 qname: Keep the packets that related to this qname 114 Returns: 115 List of filtered packets 116 """ 117 filtered_packets = [] 118 for pkt in packets: 119 if not pkt.haslayer(DNSQR): 120 continue 121 if pkt[DNSQR].qname.decode().strip(".") != qname: 122 continue 123 if pkt.haslayer(layer): 124 filtered_packets.append(pkt) 125 return filtered_packets 126 127 @test_tracker_info(uuid="dd7b8c92-c0f4-4403-a0ae-57a703162d83") 128 def test_dns_query(self): 129 # Setup environment 130 wutils.connect_to_wifi_network(self.dut, self.wifi_network) 131 # Start tcpdump on OpenWrt 132 remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) 133 # Generate query name 134 test_qname = self.generate_query_qname() 135 self.dut.log.info("Test query name = %s" % test_qname) 136 # Start send a query 137 ping_result = self.ping(test_qname) 138 local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path, 139 self.dut.device_log_path) 140 # Check DNSRR.rrname in tcpdump to verify DNS response 141 packets = rdpcap(local_pcap_path) 142 self.dut.log.info("pcap file path : %s" % local_pcap_path) 143 pkt_count = 0 144 for pkt in packets: 145 if pkt.haslayer(DNSRR) and pkt[DNSRR].rrname.decode().strip(".") == test_qname: 146 pkt_count = pkt_count + 1 147 self.dut.log.info("DNS query response count : %s" % pkt_count) 148 if not ping_result: 149 asserts.assert_true(pkt_count > 0, 150 "Did not find match standard query response in tcpdump.") 151 asserts.assert_true(ping_result, "Device ping fail.") 152 153 @test_tracker_info(uuid="cd20c6e7-9c2e-4286-b08e-c8e40e413da5") 154 def test_dns_query_retransmit(self): 155 # Setup environment 156 wutils.connect_to_wifi_network(self.dut, self.wifi_network) 157 test_qname = self.generate_query_qname() 158 packets = self._block_dns_response_and_ping(test_qname) 159 pkts = self._get_dnsqr_packets(packets, IP, test_qname) 160 pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname) 161 self.dut.log.info("IPv4 DNS query count : %s" % len(pkts)) 162 self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6)) 163 asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2, 164 "Did not find match standard query in tcpdump.") 165 166 @test_tracker_info(uuid="5f58775d-ee7b-4d2e-8e77-77d41e821415") 167 def test_private_dns_query_retransmit(self): 168 # set private DNS mode 169 cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_STRICT) 170 171 # Setup environment 172 wutils.connect_to_wifi_network(self.dut, self.wifi_network) 173 test_qname = self.generate_query_qname() 174 packets = self._block_dns_response_and_ping(test_qname) 175 pkts = self._get_dnsqr_packets(packets, IP, test_qname) 176 pkts6 = self._get_dnsqr_packets(packets, IPv6, test_qname) 177 self.dut.log.info("IPv4 DNS query count : %s" % len(pkts)) 178 self.dut.log.info("IPv6 DNS query count : %s" % len(pkts6)) 179 asserts.assert_true(len(pkts) >= 2 or len(pkts6) >= 2, 180 "Did not find match standard query in tcpdump.") 181 182