1from acts import asserts 2from acts import base_test 3from acts.test_decorators import test_tracker_info 4from acts_contrib.test_utils.net import net_test_utils as nutils 5from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 6from scapy.all import get_if_raw_hwaddr 7from scapy.layers.dns import DNS, DNSQR 8from scapy.layers.inet import IP, ICMP, UDP, TCP, RandShort, sr1 9from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest 10from scapy.config import conf 11from scapy.sendrecv import send, sendp, srp1 12from scapy.layers.l2 import Ether 13import time 14 15DUMSYS_CMD = "dumpsys connectivity tethering" 16UPSTREAM_WANTED_STRING = "Upstream wanted" 17CURRENT_UPSTREAM_STRING = "Current upstream interface" 18SSID = wutils.WifiEnums.SSID_KEY 19GOOGLE_DNS_IP_ADDRESS = "8.8.8.8" 20DEFAULT_DOMAIN_NAME = "www.google.com" 21DEFAULT_DOMAIN_NAME_IPV4 = "ipv4.google.com" 22DEFAULT_DOMAIN_NAME_IPV6 = "ipv6.google.com" 23 24 25class UsbTetheringTest(base_test.BaseTestClass): 26 """Tests for USB tethering. 27 28 Prerequisite: 29 1. Android phone should connect to the desktop with USB cable 30 2. DUT should be able to connect to cellular network and Wi-Fi network 31 3. Set the CAP_NET_RAW capability before run the test. 32 e.g., `sudo setcap cap_net_raw=eip /usr/local/bin/act.py` 33 """ 34 35 def setup_class(self): 36 self.dut = self.android_devices[0] 37 self.USB_TETHERED = False 38 39 nutils.verify_lte_data_and_tethering_supported(self.dut) 40 nutils.set_cap_net_raw_capability() 41 req_params = ("wifi_network",) 42 self.unpack_userparams(req_params) 43 # Enable USB tethering and get the USB network interface 44 iflist_before = nutils.get_if_list() 45 serial = self.dut.device_info['serial'] 46 nutils.start_usb_tethering(self.dut) 47 self.dut.recreate_services(serial) 48 self.iface = nutils.wait_for_new_iface(iflist_before) 49 conf.route6.resync() 50 self.log.info("USB tethering interface: %s." % self.iface) 51 if not self.check_upstream_ready(): 52 raise asserts.fail("Upstream interface is not active.") 53 54 def teardown_class(self): 55 nutils.stop_usb_tethering(self.dut) 56 self.USB_TETHERED = False 57 wutils.reset_wifi(self.dut) 58 59 def on_fail(self, test_name, begin_time): 60 self.dut.take_bug_report(test_name, begin_time) 61 62 @test_tracker_info(uuid="d4da7695-4342-4564-b7b0-0a30895f23eb") 63 def test_icmp_connectivity(self): 64 """Tests connectivity under ICMP. 65 66 Steps: 67 1. Enable USB tethering on Android devices 68 2. Generate ICMP packet and send to target IP address 69 3. Verify that the response contains an ICMP layer 70 """ 71 icmp = IP(dst=GOOGLE_DNS_IP_ADDRESS)/ICMP() 72 resp = sr1(icmp, timeout=2, iface=self.iface) 73 resp_msg = resp.show(dump=True) if resp else "null" 74 asserts.assert_true( 75 resp and resp.haslayer(ICMP), "Failed to send ICMP: " + resp_msg) 76 77 @test_tracker_info(uuid="0dc7d049-11bf-42f9-918a-263f4470a7e8") 78 def test_icmpv6_connectivity(self): 79 """Tests connectivity under ICMPv6. 80 81 Steps: 82 1. Enable USB tethering on Android devices 83 2. Generate ICMPv6 echo request packet and send to target URL 84 3. Verify that the response contains an IPv6 layer 85 """ 86 icmpv6 = IPv6(dst=DEFAULT_DOMAIN_NAME_IPV6)/ICMPv6EchoRequest() 87 resp = sr1(icmpv6, timeout=2, iface=self.iface) 88 resp_msg = resp.show(dump=True) if resp else "null" 89 asserts.assert_true( 90 resp and resp.haslayer(IPv6), "Failed to send ICMPv6: " + resp_msg) 91 92 @test_tracker_info(uuid="34aaffb8-8dd4-4a1f-a158-2732b8df5e59") 93 def test_dns_query_connectivity(self): 94 """Tests connectivity of DNS query. 95 96 Steps: 97 1. Enable USB tethering on Android devices 98 2. Generate DNS query and send to target DNS server 99 3. Verify that the response contains a DNS layer 100 """ 101 dnsqr = IP(dst=GOOGLE_DNS_IP_ADDRESS) \ 102 /UDP(sport=RandShort(), dport=53) \ 103 /DNS(rd=1, qd=DNSQR(qname=DEFAULT_DOMAIN_NAME)) 104 resp = sr1(dnsqr, timeout=2, iface=self.iface) 105 resp_msg = resp.show(dump=True) if resp else "null" 106 asserts.assert_true( 107 resp and resp.haslayer(DNS), "Failed to send DNS query: " + resp_msg) 108 109 @test_tracker_info(uuid="b9bed0fa-3178-4456-92e0-736b3a8cc181") 110 def test_tcp_connectivity(self): 111 """Tests connectivity under TCP. 112 113 Steps: 114 1. Enable USB tethering on Android devices 115 2. Generate TCP packet and send to target URL 116 3. Verify that the response contains a TCP layer 117 """ 118 tcp = IP(dst=DEFAULT_DOMAIN_NAME)/TCP(dport=[80, 443]) 119 resp = sr1(tcp, timeout=2, iface=self.iface) 120 resp_msg = resp.show(dump=True) if resp else "null" 121 asserts.assert_true( 122 resp and resp.haslayer(TCP), "Failed to send TCP packet:" + resp_msg) 123 124 @test_tracker_info(uuid="5e2f31f4-0b18-44be-a1ba-d82bf9050996") 125 def test_tcp_ipv6_connectivity(self): 126 """Tests connectivity under IPv6. 127 128 Steps: 129 1. Enable USB tethering on Android devices 130 2. Generate IPv6 packet and send to target URL (e.g., ipv6.google.com) 131 3. Verify that the response contains an IPv6 layer 132 """ 133 tcp_ipv6 = IPv6(dst=DEFAULT_DOMAIN_NAME_IPV6)/TCP(dport=[80, 443]) 134 resp = sr1(tcp_ipv6, timeout=2, iface=self.iface) 135 resp_msg = resp.show(dump=True) if resp else "null" 136 asserts.assert_true( 137 resp and resp.haslayer(IPv6), "Failed to send TCP packet over IPv6, resp: " + resp_msg) 138 139 def _send_http_get(self, destination, interface): 140 try: 141 syn_ack = srp1(Ether() / IPv6(dst=destination) / TCP(dport=80, flags="S"), timeout=2, iface=interface) 142 sendp(Ether() / IPv6(dst=destination) / TCP(dport=80, sport=syn_ack[TCP].dport,seq=syn_ack[TCP].ack, ack=syn_ack[TCP].seq + 1, flags='A'), iface=interface) 143 http_get_str = "GET / HTTP/1.1\r\nHost:" + destination + "\r\nAccept-Encoding: gzip, deflate\r\n\r\n" 144 req = Ether() / IPv6(dst=destination)/TCP(dport=80, sport=syn_ack[TCP].dport, seq=syn_ack[TCP].ack, ack=syn_ack[TCP].seq + 1, flags='A')/http_get_str 145 return srp1(req, timeout=2, iface=interface, retry = 3, filter = "tcp port 80") 146 except TypeError: 147 self.log.warn("Packet is sent but no answer from the server %s." % destination) 148 return None 149 150 @test_tracker_info(uuid="96115afb-e0d3-40a8-8f04-b64cedc6588f") 151 def test_http_connectivity(self): 152 """Tests connectivity under HTTP. 153 154 Steps: 155 1. Enable USB tethering on Android devices 156 2. Implement TCP 3-way handshake to simulate HTTP GET 157 3. Verify that the 3-way handshake works and response contains a TCP layer 158 """ 159 resp = None 160 for _ in range(3): 161 resp = self._send_http_get(DEFAULT_DOMAIN_NAME, self.iface) 162 if resp: 163 break 164 time.sleep(3) 165 resp_msg = resp.show(dump=True) if resp else "null" 166 asserts.assert_true( 167 resp and resp.haslayer(TCP), "Failed to send HTTP request, resp: " + resp_msg) 168 169 @test_tracker_info(uuid="140a064b-1ab0-4a92-8bdb-e52dde03d5b8") 170 def test_usb_tethering_over_wifi(self): 171 """Tests connectivity over Wi-Fi. 172 173 Steps: 174 1. Connects to a Wi-Fi network 175 2. Enable USB tethering 176 3. Verifies Wi-Fi is preferred upstream over data connection 177 """ 178 179 wutils.start_wifi_connection_scan_and_ensure_network_found( 180 self.dut, self.wifi_network[SSID]) 181 wutils.wifi_connect(self.dut, self.wifi_network) 182 wifi_network = self.dut.droid.connectivityGetActiveNetwork() 183 self.log.info("wifi network %s" % wifi_network) 184 185 self.USB_TETHERED = True 186 self.real_hwaddr = get_if_raw_hwaddr(self.iface) 187 188 output = self.dut.adb.shell(DUMSYS_CMD) 189 for line in output.split("\n"): 190 if UPSTREAM_WANTED_STRING in line: 191 asserts.assert_true("true" in line, "Upstream interface is not active") 192 self.log.info("Upstream interface is active") 193 if CURRENT_UPSTREAM_STRING in line: 194 asserts.assert_true("wlan" in line, "WiFi is not the upstream " 195 "interface") 196 self.log.info("WiFi is the upstream interface") 197 198 def check_upstream_ready(self, retry=3): 199 """Check the upstream is activated 200 201 Check the upstream is activated with retry 202 """ 203 for i in range(0, retry): 204 output = self.dut.adb.shell(DUMSYS_CMD) 205 for line in output.split("\n"): 206 if UPSTREAM_WANTED_STRING in line: 207 if "true" in line: 208 self.log.info("Upstream interface is active") 209 elif i == retry: 210 return False 211 return True 212