1#!/usr/bin/python3
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo
18
19import binascii
20import errno
21import os
22import posix
23import random
24from socket import *  # pylint: disable=g-importing-member,wildcard-import
25import struct
26import sys
27import threading
28import time
29import unittest
30
31import csocket
32import multinetwork_base
33import net_test
34from scapy import all as scapy
35
36
37ICMP_ECHO = 8
38ICMP_ECHOREPLY = 0
39ICMPV6_ECHO_REQUEST = 128
40ICMPV6_ECHO_REPLY = 129
41IPV6_MIN_MTU = 1280
42ICMPV6_HEADER_LEN = 8
43ICMPV6_PKT_TOOBIG = 2
44
45
46class PingReplyThread(threading.Thread):
47
48  MIN_TTL = 10
49  INTERMEDIATE_IPV4 = "192.0.2.2"
50  INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
51  NEIGHBOURS = ["fe80::1"]
52  LINK_MTU = 1300
53
54  def __init__(self, tun, mymac, routermac, routeraddr):
55    super(PingReplyThread, self).__init__()
56    self._tun = tun
57    self._started_flag = False
58    self._stopped_flag = False
59    self._mymac = mymac
60    self._routermac = routermac
61    self._routeraddr = routeraddr
62
63  def IsStarted(self):
64    return self._started_flag
65
66  def Stop(self):
67    self._stopped_flag = True
68
69  def ChecksumValid(self, packet):
70    # Get and clear the checksums.
71    def GetAndClearChecksum(layer):
72      if not layer:
73        return
74      try:
75        checksum = layer.chksum
76        del layer.chksum
77      except AttributeError:
78        checksum = layer.cksum
79        del layer.cksum
80      return checksum
81
82    def GetChecksum(layer):
83      try:
84        return layer.chksum
85      except AttributeError:
86        return layer.cksum
87
88    layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
89    sums = {}
90    for name in layers:
91      sums[name] = GetAndClearChecksum(packet.getlayer(name))
92
93    # Serialize the packet, so scapy recalculates the checksums, and compare
94    # them with the ones in the packet.
95    packet = packet.__class__(bytes(packet))
96    for name in layers:
97      layer = packet.getlayer(name)
98      if layer and GetChecksum(layer) != sums[name]:
99        return False
100
101    return True
102
103  def SendTimeExceeded(self, version, packet):
104    if version == 4:
105      src = packet.getlayer(scapy.IP).src
106      self.SendPacket(
107          scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
108          scapy.ICMP(type=11, code=0) /
109          packet)
110    elif version == 6:
111      src = packet.getlayer(scapy.IPv6).src
112      self.SendPacket(
113          scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
114          scapy.ICMPv6TimeExceeded(code=0) /
115          packet)
116
117  def SendPacketTooBig(self, packet):
118    src = packet.getlayer(scapy.IPv6).src
119    datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
120    self.SendPacket(
121        scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
122        scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
123        bytes(packet)[:datalen])
124
125  def IPv4Packet(self, ip):
126    icmp = ip.getlayer(scapy.ICMP)
127
128    # We only support ping for now.
129    if (ip.proto != IPPROTO_ICMP or
130        icmp.type != ICMP_ECHO or
131        icmp.code != 0):
132      return
133
134    # Check the checksums.
135    if not self.ChecksumValid(ip):
136      return
137
138    if ip.ttl < self.MIN_TTL:
139      self.SendTimeExceeded(4, ip)
140      return
141
142    icmp.type = ICMP_ECHOREPLY
143    self.SwapAddresses(ip)
144    self.SendPacket(ip)
145
146  def IPv6Packet(self, ipv6):
147    icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
148
149    # We only support ping for now.
150    if (ipv6.nh != IPPROTO_ICMPV6 or
151        not icmpv6 or
152        icmpv6.type != ICMPV6_ECHO_REQUEST or
153        icmpv6.code != 0):
154      return
155
156    # Check the checksums.
157    if not self.ChecksumValid(ipv6):
158      return
159
160    if ipv6.dst.startswith("ff02::"):
161      ipv6.dst = ipv6.src
162      for src in [self._routeraddr]:
163        ipv6.src = src
164        icmpv6.type = ICMPV6_ECHO_REPLY
165        self.SendPacket(ipv6)
166    elif ipv6.hlim < self.MIN_TTL:
167      self.SendTimeExceeded(6, ipv6)
168    elif ipv6.plen > self.LINK_MTU:
169      self.SendPacketTooBig(ipv6)
170    else:
171      icmpv6.type = ICMPV6_ECHO_REPLY
172      if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
173        return
174      self.SwapAddresses(ipv6)
175      self.SendPacket(ipv6)
176
177  def SwapAddresses(self, packet):
178    src = packet.src
179    packet.src = packet.dst
180    packet.dst = src
181
182  def SendPacket(self, packet):
183    packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
184    try:
185      posix.write(self._tun.fileno(), bytes(packet))
186    except Exception as e:  # pylint: disable=broad-exception-caught
187      if not self._stopped_flag:
188        raise e
189
190  def run(self):
191    self._started_flag = True
192    while not self._stopped_flag:
193      try:
194        packet = posix.read(self._tun.fileno(), 4096)
195      except OSError as e:
196        if e.errno == errno.EAGAIN:
197          continue
198        else:
199          break
200      except ValueError as e:
201        if not self._stopped_flag:
202          raise e
203
204      ether = scapy.Ether(packet)
205      if ether.type == net_test.ETH_P_IPV6:
206        self.IPv6Packet(ether.payload)
207      elif ether.type == net_test.ETH_P_IP:
208        self.IPv4Packet(ether.payload)
209
210
211class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
212
213  @classmethod
214  def WaitForReplyThreads(cls):
215    # Wait 2s for the reply threads to start. If they don't, don't blow up, as
216    # that would cause tearDownClass not to be called and thus not clean up
217    # routing configuration, breaking subsequent tests. Instead, just let these
218    # tests fail.
219    interval = 0.1
220    attempts = 20
221    for _ in range(attempts):
222      for _ in cls.NETIDS:
223        if all(thrd.IsStarted() for thrd in list(cls.reply_threads.values())):
224          return
225        time.sleep(interval)
226    msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
227        attempts * interval)
228    sys.stderr.write(msg)
229
230  @classmethod
231  def StopReplyThreads(cls):
232    for thread in list(cls.reply_threads.values()):
233      thread.Stop()
234
235  @classmethod
236  def setUpClass(cls):
237    super(Ping6Test, cls).setUpClass()
238    cls.reply_threads = {}
239    for netid in cls.NETIDS:
240      cls.reply_threads[netid] = PingReplyThread(
241          cls.tuns[netid],
242          cls.MyMacAddress(netid),
243          cls.RouterMacAddress(netid),
244          cls._RouterAddress(netid, 6))
245      cls.reply_threads[netid].start()
246    cls.WaitForReplyThreads()
247    cls.netid = random.choice(cls.NETIDS)
248    cls.SetDefaultNetwork(cls.netid)
249
250  @classmethod
251  def tearDownClass(cls):
252    cls.StopReplyThreads()
253    cls.ClearDefaultNetwork()
254    super(Ping6Test, cls).tearDownClass()
255
256  def setUp(self):
257    super(Ping6Test, self).setUp()
258    self.ifname = self.GetInterfaceName(self.netid)
259    self.ifindex = self.ifindices[self.netid]
260    self.lladdr = net_test.GetLinkAddress(self.ifname, True)
261    self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
262
263  def assertValidPingResponse(self, s, data):
264    family = s.family
265
266    # Receive the reply.
267    rcvd, src = s.recvfrom(32768)
268    self.assertNotEqual(0, len(rcvd), "No data received")
269
270    # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
271    # as IPv4.
272    if src[0].startswith("::ffff:"):
273      family = AF_INET
274      src = (src[0].replace("::ffff:", ""), src[1:])
275
276    # Check the data being sent is valid.
277    self.assertGreater(len(data), 7, "Not enough data for ping packet")
278    if family == AF_INET:
279      self.assertTrue(data.startswith(b"\x08\x00"), "Not an IPv4 echo request")
280    elif family == AF_INET6:
281      self.assertTrue(data.startswith(b"\x80\x00"), "Not an IPv6 echo request")
282    else:
283      self.fail("Unknown socket address family %d" * s.family)
284
285    # Check address, ICMP type, and ICMP code.
286    if family == AF_INET:
287      addr, unused_port = src
288      self.assertGreaterEqual(len(addr), len("1.1.1.1"))
289      self.assertTrue(rcvd.startswith(b"\x00\x00"), "Not an IPv4 echo reply")
290    else:
291      addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
292      self.assertGreaterEqual(len(addr), len("::"))
293      self.assertTrue(rcvd.startswith(b"\x81\x00"), "Not an IPv6 echo reply")
294      # Check that the flow label is zero and that the scope ID is sane.
295      self.assertEqual(flowlabel, 0)
296      if addr.startswith("fe80::"):
297        self.assertIn(scope_id, list(self.ifindices.values()))
298      else:
299        self.assertEqual(0, scope_id)
300
301    # TODO: check the checksum. We can't do this easily now for ICMPv6 because
302    # we don't have the IP addresses so we can't construct the pseudoheader.
303
304    # Check the sequence number and the data.
305    self.assertEqual(len(data), len(rcvd))
306    self.assertEqual(binascii.hexlify(data[6:]), binascii.hexlify(rcvd[6:]))
307
308  @staticmethod
309  def IsAlmostEqual(expected, actual, delta):
310    return abs(expected - actual) < delta
311
312  def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
313                        txmem=0, rxmem=0):
314    expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
315                "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
316                "%02X" % state,
317                "%08X:%08X" % (txmem, rxmem),
318                str(os.getuid()), "ref", "0"]
319    for actual in self.ReadProcNetSocket(name):
320      # Check that rxmem and txmem don't differ too much from each other.
321      actual_txmem, actual_rxmem = expected[3].split(":")
322      if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4):
323        return
324      if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4):
325        return
326
327      # Check all the parameters except rxmem and txmem.
328      expected[3] = actual[3]
329      # Don't check ref, it's always 2 on old kernels, but 1 for 'raw6' on 6.0+
330      expected[5] = actual[5]
331      if expected == actual:
332        return
333
334    self.fail("Cound not find socket matching %s" % expected)
335
336  def testIPv4SendWithNoConnection(self):
337    s = net_test.IPv4PingSocket()
338    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
339    s.close()
340
341  def testIPv6SendWithNoConnection(self):
342    s = net_test.IPv6PingSocket()
343    self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
344    s.close()
345
346  def testIPv4LoopbackPingWithConnect(self):
347    s = net_test.IPv4PingSocket()
348    s.connect(("127.0.0.1", 55))
349    data = net_test.IPV4_PING + b"foobarbaz"
350    s.send(data)
351    self.assertValidPingResponse(s, data)
352    s.close()
353
354  def testIPv6LoopbackPingWithConnect(self):
355    s = net_test.IPv6PingSocket()
356    s.connect(("::1", 55))
357    s.send(net_test.IPV6_PING)
358    self.assertValidPingResponse(s, net_test.IPV6_PING)
359    s.close()
360
361  def testIPv4PingUsingSendto(self):
362    s = net_test.IPv4PingSocket()
363    written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
364    self.assertEqual(len(net_test.IPV4_PING), written)
365    self.assertValidPingResponse(s, net_test.IPV4_PING)
366    s.close()
367
368  def testIPv6PingUsingSendto(self):
369    s = net_test.IPv6PingSocket()
370    written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
371    self.assertEqual(len(net_test.IPV6_PING), written)
372    self.assertValidPingResponse(s, net_test.IPV6_PING)
373    s.close()
374
375  def testIPv4NoCrash(self):
376    # Python 2.x does not provide either read() or recvmsg.
377    s = net_test.IPv4PingSocket()
378    written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
379    self.assertEqual(len(net_test.IPV4_PING), written)
380    fd = s.fileno()
381    reply = posix.read(fd, 4096)
382    self.assertEqual(written, len(reply))
383    s.close()
384
385  def testIPv6NoCrash(self):
386    # Python 2.x does not provide either read() or recvmsg.
387    s = net_test.IPv6PingSocket()
388    written = s.sendto(net_test.IPV6_PING, ("::1", 55))
389    self.assertEqual(len(net_test.IPV6_PING), written)
390    fd = s.fileno()
391    reply = posix.read(fd, 4096)
392    self.assertEqual(written, len(reply))
393    s.close()
394
395  def testCrossProtocolCrash(self):
396    # Checks that an ICMP error containing a ping packet that matches the ID
397    # of a socket of the wrong protocol (which can happen when using 464xlat)
398    # doesn't crash the kernel.
399
400    # We can only test this using IPv6 unreachables and IPv4 ping sockets,
401    # because IPv4 packets sent by scapy.send() on loopback are not received by
402    # the kernel. So we don't actually use this function yet.
403    def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
404      return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
405              scapy.ICMP(type=3, code=0) /
406              scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
407              scapy.ICMP(type=8, id=port, seq=1))
408
409    def GetIPv6Unreachable(port):
410      return (scapy.IPv6(src="::1", dst="::1") /
411              scapy.ICMPv6DestUnreach() /
412              scapy.IPv6(src="::1", dst="::1") /
413              scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
414
415    # An unreachable matching the ID of a socket of the wrong protocol
416    # shouldn't crash.
417    s = net_test.IPv4PingSocket()
418    s.connect(("127.0.0.1", 12345))
419    _, port = s.getsockname()
420    scapy.send(GetIPv6Unreachable(port), verbose=False)
421    # No crash? Good.
422    s.close()
423
424  def testCrossProtocolCalls(self):
425    """Tests that passing in the wrong family returns EAFNOSUPPORT.
426
427    Relevant kernel commits:
428      upstream net:
429        91a0b60 net/ping: handle protocol mismatching scenario
430        9145736d net: ping: Return EAFNOSUPPORT when appropriate.
431
432      android-3.10:
433        78a6809 net/ping: handle protocol mismatching scenario
434        428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
435    """
436
437    def CheckEAFNoSupport(function, *args):
438      self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
439
440    ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
441
442    # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
443    # IPv4 socket address structures, we need to pass down a socket address
444    # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
445    # will fail immediately with EINVAL because the passed-in socket length is
446    # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
447    ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
448    ipv4sockaddr = csocket.SockaddrIn6(
449        ipv4sockaddr.Pack() +
450        b"\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
451
452    s4 = net_test.IPv4PingSocket()
453    s6 = net_test.IPv6PingSocket()
454
455    # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
456    # address family, because the Python implementation will just pass garbage
457    # down to the kernel. So call the C functions directly.
458    CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
459    CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
460    CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
461    CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
462    CheckEAFNoSupport(csocket.Sendmsg,
463                      s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
464    CheckEAFNoSupport(csocket.Sendmsg,
465                      s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
466    s4.close()
467    s6.close()
468
469  def testIPv4Bind(self):
470    # Bind to unspecified address.
471    s = net_test.IPv4PingSocket()
472    s.bind(("0.0.0.0", 544))
473    self.assertEqual(("0.0.0.0", 544), s.getsockname())
474    s.close()
475
476    # Bind to loopback.
477    s = net_test.IPv4PingSocket()
478    s.bind(("127.0.0.1", 99))
479    self.assertEqual(("127.0.0.1", 99), s.getsockname())
480
481    # Binding twice is not allowed.
482    self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
483    s.close()
484
485    # But binding two different sockets to the same ID is allowed.
486    s2 = net_test.IPv4PingSocket()
487    s2.bind(("127.0.0.1", 99))
488    self.assertEqual(("127.0.0.1", 99), s2.getsockname())
489    s3 = net_test.IPv4PingSocket()
490    s3.bind(("127.0.0.1", 99))
491    self.assertEqual(("127.0.0.1", 99), s3.getsockname())
492    s2.close()
493    s3.close()
494
495    # If two sockets bind to the same port, the first one to call read() gets
496    # the response.
497    s4 = net_test.IPv4PingSocket()
498    s5 = net_test.IPv4PingSocket()
499    s4.bind(("0.0.0.0", 167))
500    s5.bind(("0.0.0.0", 167))
501    s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
502    self.assertValidPingResponse(s5, net_test.IPV4_PING)
503    csocket.SetSocketTimeout(s4, 100)
504    self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
505
506    # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
507    s6 = net_test.IPv4PingSocket()
508    s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
509    self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
510
511    s4.close()
512    s5.close()
513    s6.close()
514
515    # Can't bind after sendto.
516    s = net_test.IPv4PingSocket()
517    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
518    self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
519    s.close()
520
521  def testIPv6Bind(self):
522    # Bind to unspecified address.
523    s = net_test.IPv6PingSocket()
524    s.bind(("::", 769))
525    self.assertEqual(("::", 769, 0, 0), s.getsockname())
526    s.close()
527
528    # Bind to loopback.
529    s = net_test.IPv6PingSocket()
530    s.bind(("::1", 99))
531    self.assertEqual(("::1", 99, 0, 0), s.getsockname())
532
533    # Binding twice is not allowed.
534    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
535    s.close()
536
537    # But binding two different sockets to the same ID is allowed.
538    s2 = net_test.IPv6PingSocket()
539    s2.bind(("::1", 99))
540    self.assertEqual(("::1", 99, 0, 0), s2.getsockname())
541    s3 = net_test.IPv6PingSocket()
542    s3.bind(("::1", 99))
543    self.assertEqual(("::1", 99, 0, 0), s3.getsockname())
544    s2.close()
545    s3.close()
546
547    # Binding both IPv4 and IPv6 to the same socket works.
548    s4 = net_test.IPv4PingSocket()
549    s6 = net_test.IPv6PingSocket()
550    s4.bind(("0.0.0.0", 444))
551    s6.bind(("::", 666, 0, 0))
552    s4.close()
553    s6.close()
554
555    # Can't bind after sendto.
556    s = net_test.IPv6PingSocket()
557    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
558    self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
559    s.close()
560
561  def testIPv4InvalidBind(self):
562    s = net_test.IPv4PingSocket()
563    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
564                           s.bind, ("255.255.255.255", 1026))
565    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
566                           s.bind, ("224.0.0.1", 651))
567    # Binding to an address we don't have only works with IP_TRANSPARENT.
568    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
569                           s.bind, (net_test.IPV4_ADDR, 651))
570    try:
571      s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
572      s.bind((net_test.IPV4_ADDR, 651))
573    except IOError as e:
574      if e.errno == errno.EACCES:
575        pass  # We're not root. let it go for now.
576    s.close()
577
578  def testIPv6InvalidBind(self):
579    s = net_test.IPv6PingSocket()
580    self.assertRaisesErrno(errno.EINVAL,
581                           s.bind, ("ff02::2", 1026))
582
583    # Binding to an address we don't have only works with IPV6_TRANSPARENT.
584    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
585                           s.bind, (net_test.IPV6_ADDR, 651))
586    try:
587      s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
588      s.bind((net_test.IPV6_ADDR, 651))
589    except IOError as e:
590      if e.errno == errno.EACCES:
591        pass  # We're not root. let it go for now.
592    s.close()
593
594  def testAfUnspecBind(self):
595    # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
596    s4 = net_test.IPv4PingSocket()
597    sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
598    sockaddr.family = AF_UNSPEC
599    csocket.Bind(s4, sockaddr)
600    self.assertEqual(("0.0.0.0", 12996), s4.getsockname())
601
602    # But not if the address is anything else.
603    sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
604    sockaddr.family = AF_UNSPEC
605    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
606    s4.close()
607
608    # This doesn't work for IPv6.
609    s6 = net_test.IPv6PingSocket()
610    sockaddr = csocket.Sockaddr(("::1", 58997))
611    sockaddr.family = AF_UNSPEC
612    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
613    s6.close()
614
615  def testIPv6ScopedBind(self):
616    # Can't bind to a link-local address without a scope ID.
617    s = net_test.IPv6PingSocket()
618    self.assertRaisesErrno(errno.EINVAL,
619                           s.bind, (self.lladdr, 1026, 0, 0))
620
621    # Binding to a link-local address with a scope ID works, and the scope ID is
622    # returned by a subsequent getsockname. On Python 2, getsockname returns
623    # "fe80:1%foo". Strip it off, since the ifindex field in the return value is
624    # what matters.
625    s.bind((self.lladdr, 4646, 0, self.ifindex))
626    sockname = s.getsockname()
627    expected = self.lladdr
628    if "%" in sockname[0]:
629      expected += "%" + self.ifname
630    self.assertEqual((expected, 4646, 0, self.ifindex), sockname)
631
632    # Of course, for the above to work the address actually has to be configured
633    # on the machine.
634    self.assertRaisesErrno(errno.EADDRNOTAVAIL,
635                           s.bind, ("fe80::f00", 1026, 0, 1))
636    s.close()
637
638    # Scope IDs on non-link-local addresses are silently ignored.
639    s = net_test.IPv6PingSocket()
640    s.bind(("::1", 1234, 0, 1))
641    self.assertEqual(("::1", 1234, 0, 0), s.getsockname())
642    s.close()
643
644  def testBindAffectsIdentifier(self):
645    s = net_test.IPv6PingSocket()
646    s.bind((self.globaladdr, 0xf976))
647    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
648    self.assertEqual(b"\xf9\x76", s.recv(32768)[4:6])
649    s.close()
650
651    s = net_test.IPv6PingSocket()
652    s.bind((self.globaladdr, 0xace))
653    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
654    self.assertEqual(b"\x0a\xce", s.recv(32768)[4:6])
655    s.close()
656
657  def testLinkLocalAddress(self):
658    s = net_test.IPv6PingSocket()
659    # Sending to a link-local address with no scope fails with EINVAL.
660    self.assertRaisesErrno(errno.EINVAL,
661                           s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
662    # Sending to link-local address with a scope succeeds. Note that Python
663    # doesn't understand the "fe80::1%lo" format, even though it returns it.
664    s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
665    # No exceptions? Good.
666    s.close()
667
668  def testLinkLocalOif(self):
669    """Checks that ping to link-local addresses works correctly.
670
671    Relevant kernel commits:
672      upstream net:
673        5e45789 net: ipv6: Fix ping to link-local addresses.
674    """
675    for mode in ["oif", "ucast_oif", None]:
676      s = net_test.IPv6PingSocket()
677      for netid in self.NETIDS:
678        s2 = net_test.IPv6PingSocket()
679        dst = self._RouterAddress(netid, 6)
680        self.assertTrue(dst.startswith("fe80:"))
681
682        if mode:
683          self.SelectInterface(s, netid, mode)
684          self.SelectInterface(s2, netid, mode)
685          scopeid = 0
686        else:
687          scopeid = self.ifindices[netid]
688
689        if mode == "oif":
690          # If SO_BINDTODEVICE has been set, any attempt to send on another
691          # interface returns EINVAL.
692          othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
693                                   % len(self.NETIDS)]
694          otherscopeid = self.ifindices[othernetid]
695          self.assertRaisesErrno(
696              errno.EINVAL,
697              s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
698          self.assertRaisesErrno(
699              errno.EINVAL,
700              s.connect, (dst, 55, 0, otherscopeid))
701
702        # Try using both sendto and connect/send.
703        # If we get a reply, we sent the packet out on the right interface.
704        s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
705        self.assertValidPingResponse(s, net_test.IPV6_PING)
706
707        # IPV6_UNICAST_IF doesn't work on connected sockets.
708        if mode != "ucast_oif":
709          s2.connect((dst, 123, 0, scopeid))
710          s2.send(net_test.IPV6_PING)
711          self.assertValidPingResponse(s2, net_test.IPV6_PING)
712        s2.close()
713      s.close()
714
715  def testMappedAddressFails(self):
716    s = net_test.IPv6PingSocket()
717    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
718    self.assertValidPingResponse(s, net_test.IPV6_PING)
719    s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
720    self.assertValidPingResponse(s, net_test.IPV6_PING)
721    self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
722                           ("::ffff:192.0.2.1", 55))
723    s.close()
724
725  @unittest.skipUnless(False, "skipping: does not work yet")
726  def testFlowLabel(self):
727    s = net_test.IPv6PingSocket()
728
729    # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
730    # the flow label in the packet is not set.
731    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
732    self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
733
734    # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
735    # that is not registered with the flow manager should return EINVAL...
736    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
737    # ... but this doesn't work yet.
738    if False:  # pylint: disable=using-constant-test
739      self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
740                             (net_test.IPV6_ADDR, 93, 0xdead, 0))
741
742    # After registering the flow label, it gets sent properly, appears in the
743    # output packet, and is returned in the response.
744    net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
745    self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
746                                     net_test.IPV6_FLOWINFO_SEND))
747    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
748    _, src = s.recvfrom(32768)
749    _, _, flowlabel, _ = src
750    self.assertEqual(0xdead, flowlabel & 0xfffff)
751    s.close()
752
753  def testIPv4Error(self):
754    s = net_test.IPv4PingSocket()
755    s.setsockopt(SOL_IP, IP_TTL, 2)
756    s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
757    s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
758    # We can't check the actual error because Python 2.7 doesn't implement
759    # recvmsg, but we can at least check that the socket returns an error.
760    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
761    s.close()
762
763  def testIPv6Error(self):
764    s = net_test.IPv6PingSocket()
765    s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
766    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
767    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
768    # We can't check the actual error because Python 2.7 doesn't implement
769    # recvmsg, but we can at least check that the socket returns an error.
770    self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
771    s.close()
772
773  def testIPv6MulticastPing(self):
774    s = net_test.IPv6PingSocket()
775    # Send a multicast ping and check we get at least one duplicate.
776    # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
777    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
778    s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
779    self.assertValidPingResponse(s, net_test.IPV6_PING)
780    self.assertValidPingResponse(s, net_test.IPV6_PING)
781    s.close()
782
783  def testIPv4LargePacket(self):
784    s = net_test.IPv4PingSocket()
785    data = net_test.IPV4_PING + 20000 * b"a"
786    s.sendto(data, ("127.0.0.1", 987))
787    self.assertValidPingResponse(s, data)
788    s.close()
789
790  def testIPv6LargePacket(self):
791    s = net_test.IPv6PingSocket()
792    s.bind(("::", 0xace))
793    data = net_test.IPV6_PING + b"\x01" + 19994 * b"\x00" + b"aaaaa"
794    s.sendto(data, ("::1", 953))
795    s.close()
796
797  def testIcmpSocketsNotInIcmp6(self):
798    numrows = len(self.ReadProcNetSocket("icmp"))
799    numrows6 = len(self.ReadProcNetSocket("icmp6"))
800    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
801    s.bind(("127.0.0.1", 0xace))
802    s.connect(("127.0.0.1", 0xbeef))
803    self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp")))
804    self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
805    s.close()
806
807  def testIcmp6SocketsNotInIcmp(self):
808    numrows = len(self.ReadProcNetSocket("icmp"))
809    numrows6 = len(self.ReadProcNetSocket("icmp6"))
810    s = net_test.IPv6PingSocket()
811    s.bind(("::1", 0xace))
812    s.connect(("::1", 0xbeef))
813    self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp")))
814    self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
815    s.close()
816
817  def testProcNetIcmp(self):
818    s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
819    s.bind(("127.0.0.1", 0xace))
820    s.connect(("127.0.0.1", 0xbeef))
821    self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
822    s.close()
823
824  def testProcNetIcmp6(self):
825    numrows6 = len(self.ReadProcNetSocket("icmp6"))
826    s = net_test.IPv6PingSocket()
827    s.bind(("::1", 0xace))
828    s.connect(("::1", 0xbeef))
829    self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
830
831    # Check the row goes away when the socket is closed.
832    s.close()
833    self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
834
835    # Try send, bind and connect to check the addresses and the state.
836    s = net_test.IPv6PingSocket()
837    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
838    s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
839    self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
840    s.close()
841
842    # Can't bind after sendto, apparently.
843    s = net_test.IPv6PingSocket()
844    self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
845    s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
846    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
847
848    # Check receive bytes.
849    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
850    s.connect(("ff02::1", 0xdead))
851    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
852    s.send(net_test.IPV6_PING)
853    s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
854    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
855                           txmem=0, rxmem=0x300)
856    self.assertValidPingResponse(s, net_test.IPV6_PING)
857    self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
858                           txmem=0, rxmem=0)
859    s.close()
860
861  def testProcNetUdp6(self):
862    s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
863    s.bind(("::1", 0xace))
864    s.connect(("::1", 0xbeef))
865    self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
866    s.close()
867
868  def testProcNetRaw6(self):
869    s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
870    s.bind(("::1", 0xace))
871    s.connect(("::1", 0xbeef))
872    self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
873    s.close()
874
875  def testIPv6MTU(self):
876    """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
877
878    Relevant kernel commits:
879      upstream net-next:
880        dcb94b8 ipv6: fix endianness error in icmpv6_err
881    """
882    s = net_test.IPv6PingSocket()
883    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
884    s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
885    s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
886    s.connect((net_test.IPV6_ADDR, 55))
887    pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * b"a"
888    s.send(pkt)
889    self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
890    data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
891
892    # Compare the offending packet with the one we sent. To do this we need to
893    # calculate the ident of the packet we sent and blank out the checksum of
894    # the one we received.
895    ident = struct.pack("!H", s.getsockname()[1])
896    pkt = pkt[:4] + ident + pkt[6:]
897    data = data[:2] + b"\x00\x00" + pkt[4:]
898    self.assertEqual(pkt, data)
899
900    # Check the address that the packet was sent to.
901    self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
902
903    # Check the cmsg data, including the link MTU.
904    mtu = PingReplyThread.LINK_MTU
905    src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
906    msglist = [
907        (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
908         (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
909                                   ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
910          csocket.Sockaddr((src, 0))))
911    ]
912
913    self.assertEqual(msglist, cmsg)
914    s.close()
915
916
917if __name__ == "__main__":
918  unittest.main()
919