1from acts import asserts
2from acts import base_test
3from acts.test_decorators import test_tracker_info
4from acts_contrib.test_utils.net import net_test_utils as nutils
5from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
6from scapy.all import get_if_raw_hwaddr
7from scapy.layers.dns import DNS, DNSQR
8from scapy.layers.inet import IP, ICMP, UDP, TCP, RandShort, sr1
9from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10from scapy.config import conf
11from scapy.sendrecv import send, sendp, srp1
12from scapy.layers.l2 import Ether
13import time
14
15DUMSYS_CMD = "dumpsys connectivity tethering"
16UPSTREAM_WANTED_STRING = "Upstream wanted"
17CURRENT_UPSTREAM_STRING = "Current upstream interface"
18SSID = wutils.WifiEnums.SSID_KEY
19GOOGLE_DNS_IP_ADDRESS = "8.8.8.8"
20DEFAULT_DOMAIN_NAME = "www.google.com"
21DEFAULT_DOMAIN_NAME_IPV4 = "ipv4.google.com"
22DEFAULT_DOMAIN_NAME_IPV6 = "ipv6.google.com"
23
24
25class UsbTetheringTest(base_test.BaseTestClass):
26  """Tests for USB tethering.
27
28  Prerequisite:
29  1. Android phone should connect to the desktop with USB cable
30  2. DUT should be able to connect to cellular network and Wi-Fi network
31  3. Set the CAP_NET_RAW capability before run the test.
32     e.g., `sudo setcap cap_net_raw=eip /usr/local/bin/act.py`
33  """
34
35  def setup_class(self):
36    self.dut = self.android_devices[0]
37    self.USB_TETHERED = False
38
39    nutils.verify_lte_data_and_tethering_supported(self.dut)
40    nutils.set_cap_net_raw_capability()
41    req_params = ("wifi_network",)
42    self.unpack_userparams(req_params)
43    # Enable USB tethering and get the USB network interface
44    iflist_before = nutils.get_if_list()
45    serial = self.dut.device_info['serial']
46    nutils.start_usb_tethering(self.dut)
47    self.dut.recreate_services(serial)
48    self.iface = nutils.wait_for_new_iface(iflist_before)
49    conf.route6.resync()
50    self.log.info("USB tethering interface: %s." % self.iface)
51    if not self.check_upstream_ready():
52      raise asserts.fail("Upstream interface is not active.")
53
54  def teardown_class(self):
55    nutils.stop_usb_tethering(self.dut)
56    self.USB_TETHERED = False
57    wutils.reset_wifi(self.dut)
58
59  def on_fail(self, test_name, begin_time):
60    self.dut.take_bug_report(test_name, begin_time)
61
62  @test_tracker_info(uuid="d4da7695-4342-4564-b7b0-0a30895f23eb")
63  def test_icmp_connectivity(self):
64    """Tests connectivity under ICMP.
65
66    Steps:
67    1. Enable USB tethering on Android devices
68    2. Generate ICMP packet and send to target IP address
69    3. Verify that the response contains an ICMP layer
70    """
71    icmp = IP(dst=GOOGLE_DNS_IP_ADDRESS)/ICMP()
72    resp = sr1(icmp, timeout=2, iface=self.iface)
73    resp_msg = resp.show(dump=True) if resp else "null"
74    asserts.assert_true(
75        resp and resp.haslayer(ICMP), "Failed to send ICMP: " + resp_msg)
76
77  @test_tracker_info(uuid="0dc7d049-11bf-42f9-918a-263f4470a7e8")
78  def test_icmpv6_connectivity(self):
79    """Tests connectivity under ICMPv6.
80
81    Steps:
82    1. Enable USB tethering on Android devices
83    2. Generate ICMPv6 echo request packet and send to target URL
84    3. Verify that the response contains an IPv6 layer
85    """
86    icmpv6 = IPv6(dst=DEFAULT_DOMAIN_NAME_IPV6)/ICMPv6EchoRequest()
87    resp = sr1(icmpv6, timeout=2, iface=self.iface)
88    resp_msg = resp.show(dump=True) if resp else "null"
89    asserts.assert_true(
90        resp and resp.haslayer(IPv6), "Failed to send ICMPv6: " + resp_msg)
91
92  @test_tracker_info(uuid="34aaffb8-8dd4-4a1f-a158-2732b8df5e59")
93  def test_dns_query_connectivity(self):
94    """Tests connectivity of DNS query.
95
96    Steps:
97    1. Enable USB tethering on Android devices
98    2. Generate DNS query and send to target DNS server
99    3. Verify that the response contains a DNS layer
100    """
101    dnsqr = IP(dst=GOOGLE_DNS_IP_ADDRESS) \
102            /UDP(sport=RandShort(), dport=53) \
103            /DNS(rd=1, qd=DNSQR(qname=DEFAULT_DOMAIN_NAME))
104    resp = sr1(dnsqr, timeout=2, iface=self.iface)
105    resp_msg = resp.show(dump=True) if resp else "null"
106    asserts.assert_true(
107        resp and resp.haslayer(DNS), "Failed to send DNS query: " + resp_msg)
108
109  @test_tracker_info(uuid="b9bed0fa-3178-4456-92e0-736b3a8cc181")
110  def test_tcp_connectivity(self):
111    """Tests connectivity under TCP.
112
113    Steps:
114    1. Enable USB tethering on Android devices
115    2. Generate TCP packet and send to target URL
116    3. Verify that the response contains a TCP layer
117    """
118    tcp = IP(dst=DEFAULT_DOMAIN_NAME)/TCP(dport=[80, 443])
119    resp = sr1(tcp, timeout=2, iface=self.iface)
120    resp_msg = resp.show(dump=True) if resp else "null"
121    asserts.assert_true(
122        resp and resp.haslayer(TCP), "Failed to send TCP packet:" + resp_msg)
123
124  @test_tracker_info(uuid="5e2f31f4-0b18-44be-a1ba-d82bf9050996")
125  def test_tcp_ipv6_connectivity(self):
126    """Tests connectivity under IPv6.
127
128    Steps:
129    1. Enable USB tethering on Android devices
130    2. Generate IPv6 packet and send to target URL (e.g., ipv6.google.com)
131    3. Verify that the response contains an IPv6 layer
132    """
133    tcp_ipv6 = IPv6(dst=DEFAULT_DOMAIN_NAME_IPV6)/TCP(dport=[80, 443])
134    resp = sr1(tcp_ipv6, timeout=2, iface=self.iface)
135    resp_msg = resp.show(dump=True) if resp else "null"
136    asserts.assert_true(
137        resp and resp.haslayer(IPv6), "Failed to send TCP packet over IPv6, resp: " + resp_msg)
138
139  def _send_http_get(self, destination, interface):
140    try:
141        syn_ack = srp1(Ether() / IPv6(dst=destination) / TCP(dport=80, flags="S"), timeout=2, iface=interface)
142        sendp(Ether() / IPv6(dst=destination) / TCP(dport=80, sport=syn_ack[TCP].dport,seq=syn_ack[TCP].ack, ack=syn_ack[TCP].seq + 1, flags='A'), iface=interface)
143        http_get_str = "GET / HTTP/1.1\r\nHost:" + destination + "\r\nAccept-Encoding: gzip, deflate\r\n\r\n"
144        req = Ether() / IPv6(dst=destination)/TCP(dport=80, sport=syn_ack[TCP].dport, seq=syn_ack[TCP].ack, ack=syn_ack[TCP].seq + 1, flags='A')/http_get_str
145        return srp1(req, timeout=2, iface=interface, retry = 3, filter = "tcp port 80")
146    except TypeError:
147        self.log.warn("Packet is sent but no answer from the server %s." % destination)
148        return None
149
150  @test_tracker_info(uuid="96115afb-e0d3-40a8-8f04-b64cedc6588f")
151  def test_http_connectivity(self):
152    """Tests connectivity under HTTP.
153
154    Steps:
155    1. Enable USB tethering on Android devices
156    2. Implement TCP 3-way handshake to simulate HTTP GET
157    3. Verify that the 3-way handshake works and response contains a TCP layer
158    """
159    resp = None
160    for _ in range(3):
161      resp = self._send_http_get(DEFAULT_DOMAIN_NAME, self.iface)
162      if resp:
163        break
164      time.sleep(3)
165    resp_msg = resp.show(dump=True) if resp else "null"
166    asserts.assert_true(
167        resp and resp.haslayer(TCP), "Failed to send HTTP request, resp: " + resp_msg)
168
169  @test_tracker_info(uuid="140a064b-1ab0-4a92-8bdb-e52dde03d5b8")
170  def test_usb_tethering_over_wifi(self):
171    """Tests connectivity over Wi-Fi.
172
173    Steps:
174    1. Connects to a Wi-Fi network
175    2. Enable USB tethering
176    3. Verifies Wi-Fi is preferred upstream over data connection
177    """
178
179    wutils.start_wifi_connection_scan_and_ensure_network_found(
180        self.dut, self.wifi_network[SSID])
181    wutils.wifi_connect(self.dut, self.wifi_network)
182    wifi_network = self.dut.droid.connectivityGetActiveNetwork()
183    self.log.info("wifi network %s" % wifi_network)
184
185    self.USB_TETHERED = True
186    self.real_hwaddr = get_if_raw_hwaddr(self.iface)
187
188    output = self.dut.adb.shell(DUMSYS_CMD)
189    for line in output.split("\n"):
190      if UPSTREAM_WANTED_STRING in line:
191        asserts.assert_true("true" in line, "Upstream interface is not active")
192        self.log.info("Upstream interface is active")
193      if CURRENT_UPSTREAM_STRING in line:
194        asserts.assert_true("wlan" in line, "WiFi is not the upstream "
195                            "interface")
196        self.log.info("WiFi is the upstream interface")
197
198  def check_upstream_ready(self, retry=3):
199    """Check the upstream is activated
200
201    Check the upstream is activated with retry
202    """
203    for i in range(0, retry):
204      output = self.dut.adb.shell(DUMSYS_CMD)
205      for line in output.split("\n"):
206        if UPSTREAM_WANTED_STRING in line:
207          if "true" in line:
208            self.log.info("Upstream interface is active")
209          elif i == retry:
210            return False
211    return True
212