1#!/usr/bin/python3 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import binascii 20import errno 21import os 22import posix 23import random 24from socket import * # pylint: disable=g-importing-member,wildcard-import 25import struct 26import sys 27import threading 28import time 29import unittest 30 31import csocket 32import multinetwork_base 33import net_test 34from scapy import all as scapy 35 36 37ICMP_ECHO = 8 38ICMP_ECHOREPLY = 0 39ICMPV6_ECHO_REQUEST = 128 40ICMPV6_ECHO_REPLY = 129 41IPV6_MIN_MTU = 1280 42ICMPV6_HEADER_LEN = 8 43ICMPV6_PKT_TOOBIG = 2 44 45 46class PingReplyThread(threading.Thread): 47 48 MIN_TTL = 10 49 INTERMEDIATE_IPV4 = "192.0.2.2" 50 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 51 NEIGHBOURS = ["fe80::1"] 52 LINK_MTU = 1300 53 54 def __init__(self, tun, mymac, routermac, routeraddr): 55 super(PingReplyThread, self).__init__() 56 self._tun = tun 57 self._started_flag = False 58 self._stopped_flag = False 59 self._mymac = mymac 60 self._routermac = routermac 61 self._routeraddr = routeraddr 62 63 def IsStarted(self): 64 return self._started_flag 65 66 def Stop(self): 67 self._stopped_flag = True 68 69 def ChecksumValid(self, packet): 70 # Get and clear the checksums. 71 def GetAndClearChecksum(layer): 72 if not layer: 73 return 74 try: 75 checksum = layer.chksum 76 del layer.chksum 77 except AttributeError: 78 checksum = layer.cksum 79 del layer.cksum 80 return checksum 81 82 def GetChecksum(layer): 83 try: 84 return layer.chksum 85 except AttributeError: 86 return layer.cksum 87 88 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 89 sums = {} 90 for name in layers: 91 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 92 93 # Serialize the packet, so scapy recalculates the checksums, and compare 94 # them with the ones in the packet. 95 packet = packet.__class__(bytes(packet)) 96 for name in layers: 97 layer = packet.getlayer(name) 98 if layer and GetChecksum(layer) != sums[name]: 99 return False 100 101 return True 102 103 def SendTimeExceeded(self, version, packet): 104 if version == 4: 105 src = packet.getlayer(scapy.IP).src 106 self.SendPacket( 107 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 108 scapy.ICMP(type=11, code=0) / 109 packet) 110 elif version == 6: 111 src = packet.getlayer(scapy.IPv6).src 112 self.SendPacket( 113 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 114 scapy.ICMPv6TimeExceeded(code=0) / 115 packet) 116 117 def SendPacketTooBig(self, packet): 118 src = packet.getlayer(scapy.IPv6).src 119 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 120 self.SendPacket( 121 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 122 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 123 bytes(packet)[:datalen]) 124 125 def IPv4Packet(self, ip): 126 icmp = ip.getlayer(scapy.ICMP) 127 128 # We only support ping for now. 129 if (ip.proto != IPPROTO_ICMP or 130 icmp.type != ICMP_ECHO or 131 icmp.code != 0): 132 return 133 134 # Check the checksums. 135 if not self.ChecksumValid(ip): 136 return 137 138 if ip.ttl < self.MIN_TTL: 139 self.SendTimeExceeded(4, ip) 140 return 141 142 icmp.type = ICMP_ECHOREPLY 143 self.SwapAddresses(ip) 144 self.SendPacket(ip) 145 146 def IPv6Packet(self, ipv6): 147 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 148 149 # We only support ping for now. 150 if (ipv6.nh != IPPROTO_ICMPV6 or 151 not icmpv6 or 152 icmpv6.type != ICMPV6_ECHO_REQUEST or 153 icmpv6.code != 0): 154 return 155 156 # Check the checksums. 157 if not self.ChecksumValid(ipv6): 158 return 159 160 if ipv6.dst.startswith("ff02::"): 161 ipv6.dst = ipv6.src 162 for src in [self._routeraddr]: 163 ipv6.src = src 164 icmpv6.type = ICMPV6_ECHO_REPLY 165 self.SendPacket(ipv6) 166 elif ipv6.hlim < self.MIN_TTL: 167 self.SendTimeExceeded(6, ipv6) 168 elif ipv6.plen > self.LINK_MTU: 169 self.SendPacketTooBig(ipv6) 170 else: 171 icmpv6.type = ICMPV6_ECHO_REPLY 172 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 173 return 174 self.SwapAddresses(ipv6) 175 self.SendPacket(ipv6) 176 177 def SwapAddresses(self, packet): 178 src = packet.src 179 packet.src = packet.dst 180 packet.dst = src 181 182 def SendPacket(self, packet): 183 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 184 try: 185 posix.write(self._tun.fileno(), bytes(packet)) 186 except Exception as e: # pylint: disable=broad-exception-caught 187 if not self._stopped_flag: 188 raise e 189 190 def run(self): 191 self._started_flag = True 192 while not self._stopped_flag: 193 try: 194 packet = posix.read(self._tun.fileno(), 4096) 195 except OSError as e: 196 if e.errno == errno.EAGAIN: 197 continue 198 else: 199 break 200 except ValueError as e: 201 if not self._stopped_flag: 202 raise e 203 204 ether = scapy.Ether(packet) 205 if ether.type == net_test.ETH_P_IPV6: 206 self.IPv6Packet(ether.payload) 207 elif ether.type == net_test.ETH_P_IP: 208 self.IPv4Packet(ether.payload) 209 210 211class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 212 213 @classmethod 214 def WaitForReplyThreads(cls): 215 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 216 # that would cause tearDownClass not to be called and thus not clean up 217 # routing configuration, breaking subsequent tests. Instead, just let these 218 # tests fail. 219 interval = 0.1 220 attempts = 20 221 for _ in range(attempts): 222 for _ in cls.NETIDS: 223 if all(thrd.IsStarted() for thrd in list(cls.reply_threads.values())): 224 return 225 time.sleep(interval) 226 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 227 attempts * interval) 228 sys.stderr.write(msg) 229 230 @classmethod 231 def StopReplyThreads(cls): 232 for thread in list(cls.reply_threads.values()): 233 thread.Stop() 234 235 @classmethod 236 def setUpClass(cls): 237 super(Ping6Test, cls).setUpClass() 238 cls.reply_threads = {} 239 for netid in cls.NETIDS: 240 cls.reply_threads[netid] = PingReplyThread( 241 cls.tuns[netid], 242 cls.MyMacAddress(netid), 243 cls.RouterMacAddress(netid), 244 cls._RouterAddress(netid, 6)) 245 cls.reply_threads[netid].start() 246 cls.WaitForReplyThreads() 247 cls.netid = random.choice(cls.NETIDS) 248 cls.SetDefaultNetwork(cls.netid) 249 250 @classmethod 251 def tearDownClass(cls): 252 cls.StopReplyThreads() 253 cls.ClearDefaultNetwork() 254 super(Ping6Test, cls).tearDownClass() 255 256 def setUp(self): 257 super(Ping6Test, self).setUp() 258 self.ifname = self.GetInterfaceName(self.netid) 259 self.ifindex = self.ifindices[self.netid] 260 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 261 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 262 263 def assertValidPingResponse(self, s, data): 264 family = s.family 265 266 # Receive the reply. 267 rcvd, src = s.recvfrom(32768) 268 self.assertNotEqual(0, len(rcvd), "No data received") 269 270 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 271 # as IPv4. 272 if src[0].startswith("::ffff:"): 273 family = AF_INET 274 src = (src[0].replace("::ffff:", ""), src[1:]) 275 276 # Check the data being sent is valid. 277 self.assertGreater(len(data), 7, "Not enough data for ping packet") 278 if family == AF_INET: 279 self.assertTrue(data.startswith(b"\x08\x00"), "Not an IPv4 echo request") 280 elif family == AF_INET6: 281 self.assertTrue(data.startswith(b"\x80\x00"), "Not an IPv6 echo request") 282 else: 283 self.fail("Unknown socket address family %d" * s.family) 284 285 # Check address, ICMP type, and ICMP code. 286 if family == AF_INET: 287 addr, unused_port = src 288 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 289 self.assertTrue(rcvd.startswith(b"\x00\x00"), "Not an IPv4 echo reply") 290 else: 291 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 292 self.assertGreaterEqual(len(addr), len("::")) 293 self.assertTrue(rcvd.startswith(b"\x81\x00"), "Not an IPv6 echo reply") 294 # Check that the flow label is zero and that the scope ID is sane. 295 self.assertEqual(flowlabel, 0) 296 if addr.startswith("fe80::"): 297 self.assertIn(scope_id, list(self.ifindices.values())) 298 else: 299 self.assertEqual(0, scope_id) 300 301 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 302 # we don't have the IP addresses so we can't construct the pseudoheader. 303 304 # Check the sequence number and the data. 305 self.assertEqual(len(data), len(rcvd)) 306 self.assertEqual(binascii.hexlify(data[6:]), binascii.hexlify(rcvd[6:])) 307 308 @staticmethod 309 def IsAlmostEqual(expected, actual, delta): 310 return abs(expected - actual) < delta 311 312 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 313 txmem=0, rxmem=0): 314 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 315 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 316 "%02X" % state, 317 "%08X:%08X" % (txmem, rxmem), 318 str(os.getuid()), "ref", "0"] 319 for actual in self.ReadProcNetSocket(name): 320 # Check that rxmem and txmem don't differ too much from each other. 321 actual_txmem, actual_rxmem = expected[3].split(":") 322 if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4): 323 return 324 if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4): 325 return 326 327 # Check all the parameters except rxmem and txmem. 328 expected[3] = actual[3] 329 # Don't check ref, it's always 2 on old kernels, but 1 for 'raw6' on 6.0+ 330 expected[5] = actual[5] 331 if expected == actual: 332 return 333 334 self.fail("Cound not find socket matching %s" % expected) 335 336 def testIPv4SendWithNoConnection(self): 337 s = net_test.IPv4PingSocket() 338 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 339 s.close() 340 341 def testIPv6SendWithNoConnection(self): 342 s = net_test.IPv6PingSocket() 343 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 344 s.close() 345 346 def testIPv4LoopbackPingWithConnect(self): 347 s = net_test.IPv4PingSocket() 348 s.connect(("127.0.0.1", 55)) 349 data = net_test.IPV4_PING + b"foobarbaz" 350 s.send(data) 351 self.assertValidPingResponse(s, data) 352 s.close() 353 354 def testIPv6LoopbackPingWithConnect(self): 355 s = net_test.IPv6PingSocket() 356 s.connect(("::1", 55)) 357 s.send(net_test.IPV6_PING) 358 self.assertValidPingResponse(s, net_test.IPV6_PING) 359 s.close() 360 361 def testIPv4PingUsingSendto(self): 362 s = net_test.IPv4PingSocket() 363 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 364 self.assertEqual(len(net_test.IPV4_PING), written) 365 self.assertValidPingResponse(s, net_test.IPV4_PING) 366 s.close() 367 368 def testIPv6PingUsingSendto(self): 369 s = net_test.IPv6PingSocket() 370 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 371 self.assertEqual(len(net_test.IPV6_PING), written) 372 self.assertValidPingResponse(s, net_test.IPV6_PING) 373 s.close() 374 375 def testIPv4NoCrash(self): 376 # Python 2.x does not provide either read() or recvmsg. 377 s = net_test.IPv4PingSocket() 378 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 379 self.assertEqual(len(net_test.IPV4_PING), written) 380 fd = s.fileno() 381 reply = posix.read(fd, 4096) 382 self.assertEqual(written, len(reply)) 383 s.close() 384 385 def testIPv6NoCrash(self): 386 # Python 2.x does not provide either read() or recvmsg. 387 s = net_test.IPv6PingSocket() 388 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 389 self.assertEqual(len(net_test.IPV6_PING), written) 390 fd = s.fileno() 391 reply = posix.read(fd, 4096) 392 self.assertEqual(written, len(reply)) 393 s.close() 394 395 def testCrossProtocolCrash(self): 396 # Checks that an ICMP error containing a ping packet that matches the ID 397 # of a socket of the wrong protocol (which can happen when using 464xlat) 398 # doesn't crash the kernel. 399 400 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 401 # because IPv4 packets sent by scapy.send() on loopback are not received by 402 # the kernel. So we don't actually use this function yet. 403 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 404 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 405 scapy.ICMP(type=3, code=0) / 406 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 407 scapy.ICMP(type=8, id=port, seq=1)) 408 409 def GetIPv6Unreachable(port): 410 return (scapy.IPv6(src="::1", dst="::1") / 411 scapy.ICMPv6DestUnreach() / 412 scapy.IPv6(src="::1", dst="::1") / 413 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 414 415 # An unreachable matching the ID of a socket of the wrong protocol 416 # shouldn't crash. 417 s = net_test.IPv4PingSocket() 418 s.connect(("127.0.0.1", 12345)) 419 _, port = s.getsockname() 420 scapy.send(GetIPv6Unreachable(port), verbose=False) 421 # No crash? Good. 422 s.close() 423 424 def testCrossProtocolCalls(self): 425 """Tests that passing in the wrong family returns EAFNOSUPPORT. 426 427 Relevant kernel commits: 428 upstream net: 429 91a0b60 net/ping: handle protocol mismatching scenario 430 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 431 432 android-3.10: 433 78a6809 net/ping: handle protocol mismatching scenario 434 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 435 """ 436 437 def CheckEAFNoSupport(function, *args): 438 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 439 440 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 441 442 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 443 # IPv4 socket address structures, we need to pass down a socket address 444 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 445 # will fail immediately with EINVAL because the passed-in socket length is 446 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 447 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 448 ipv4sockaddr = csocket.SockaddrIn6( 449 ipv4sockaddr.Pack() + 450 b"\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 451 452 s4 = net_test.IPv4PingSocket() 453 s6 = net_test.IPv6PingSocket() 454 455 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 456 # address family, because the Python implementation will just pass garbage 457 # down to the kernel. So call the C functions directly. 458 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 459 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 460 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 461 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 462 CheckEAFNoSupport(csocket.Sendmsg, 463 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 464 CheckEAFNoSupport(csocket.Sendmsg, 465 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 466 s4.close() 467 s6.close() 468 469 def testIPv4Bind(self): 470 # Bind to unspecified address. 471 s = net_test.IPv4PingSocket() 472 s.bind(("0.0.0.0", 544)) 473 self.assertEqual(("0.0.0.0", 544), s.getsockname()) 474 s.close() 475 476 # Bind to loopback. 477 s = net_test.IPv4PingSocket() 478 s.bind(("127.0.0.1", 99)) 479 self.assertEqual(("127.0.0.1", 99), s.getsockname()) 480 481 # Binding twice is not allowed. 482 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 483 s.close() 484 485 # But binding two different sockets to the same ID is allowed. 486 s2 = net_test.IPv4PingSocket() 487 s2.bind(("127.0.0.1", 99)) 488 self.assertEqual(("127.0.0.1", 99), s2.getsockname()) 489 s3 = net_test.IPv4PingSocket() 490 s3.bind(("127.0.0.1", 99)) 491 self.assertEqual(("127.0.0.1", 99), s3.getsockname()) 492 s2.close() 493 s3.close() 494 495 # If two sockets bind to the same port, the first one to call read() gets 496 # the response. 497 s4 = net_test.IPv4PingSocket() 498 s5 = net_test.IPv4PingSocket() 499 s4.bind(("0.0.0.0", 167)) 500 s5.bind(("0.0.0.0", 167)) 501 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 502 self.assertValidPingResponse(s5, net_test.IPV4_PING) 503 csocket.SetSocketTimeout(s4, 100) 504 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 505 506 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 507 s6 = net_test.IPv4PingSocket() 508 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 509 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 510 511 s4.close() 512 s5.close() 513 s6.close() 514 515 # Can't bind after sendto. 516 s = net_test.IPv4PingSocket() 517 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 518 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 519 s.close() 520 521 def testIPv6Bind(self): 522 # Bind to unspecified address. 523 s = net_test.IPv6PingSocket() 524 s.bind(("::", 769)) 525 self.assertEqual(("::", 769, 0, 0), s.getsockname()) 526 s.close() 527 528 # Bind to loopback. 529 s = net_test.IPv6PingSocket() 530 s.bind(("::1", 99)) 531 self.assertEqual(("::1", 99, 0, 0), s.getsockname()) 532 533 # Binding twice is not allowed. 534 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 535 s.close() 536 537 # But binding two different sockets to the same ID is allowed. 538 s2 = net_test.IPv6PingSocket() 539 s2.bind(("::1", 99)) 540 self.assertEqual(("::1", 99, 0, 0), s2.getsockname()) 541 s3 = net_test.IPv6PingSocket() 542 s3.bind(("::1", 99)) 543 self.assertEqual(("::1", 99, 0, 0), s3.getsockname()) 544 s2.close() 545 s3.close() 546 547 # Binding both IPv4 and IPv6 to the same socket works. 548 s4 = net_test.IPv4PingSocket() 549 s6 = net_test.IPv6PingSocket() 550 s4.bind(("0.0.0.0", 444)) 551 s6.bind(("::", 666, 0, 0)) 552 s4.close() 553 s6.close() 554 555 # Can't bind after sendto. 556 s = net_test.IPv6PingSocket() 557 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 558 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 559 s.close() 560 561 def testIPv4InvalidBind(self): 562 s = net_test.IPv4PingSocket() 563 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 564 s.bind, ("255.255.255.255", 1026)) 565 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 566 s.bind, ("224.0.0.1", 651)) 567 # Binding to an address we don't have only works with IP_TRANSPARENT. 568 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 569 s.bind, (net_test.IPV4_ADDR, 651)) 570 try: 571 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 572 s.bind((net_test.IPV4_ADDR, 651)) 573 except IOError as e: 574 if e.errno == errno.EACCES: 575 pass # We're not root. let it go for now. 576 s.close() 577 578 def testIPv6InvalidBind(self): 579 s = net_test.IPv6PingSocket() 580 self.assertRaisesErrno(errno.EINVAL, 581 s.bind, ("ff02::2", 1026)) 582 583 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 584 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 585 s.bind, (net_test.IPV6_ADDR, 651)) 586 try: 587 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 588 s.bind((net_test.IPV6_ADDR, 651)) 589 except IOError as e: 590 if e.errno == errno.EACCES: 591 pass # We're not root. let it go for now. 592 s.close() 593 594 def testAfUnspecBind(self): 595 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 596 s4 = net_test.IPv4PingSocket() 597 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 598 sockaddr.family = AF_UNSPEC 599 csocket.Bind(s4, sockaddr) 600 self.assertEqual(("0.0.0.0", 12996), s4.getsockname()) 601 602 # But not if the address is anything else. 603 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 604 sockaddr.family = AF_UNSPEC 605 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 606 s4.close() 607 608 # This doesn't work for IPv6. 609 s6 = net_test.IPv6PingSocket() 610 sockaddr = csocket.Sockaddr(("::1", 58997)) 611 sockaddr.family = AF_UNSPEC 612 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 613 s6.close() 614 615 def testIPv6ScopedBind(self): 616 # Can't bind to a link-local address without a scope ID. 617 s = net_test.IPv6PingSocket() 618 self.assertRaisesErrno(errno.EINVAL, 619 s.bind, (self.lladdr, 1026, 0, 0)) 620 621 # Binding to a link-local address with a scope ID works, and the scope ID is 622 # returned by a subsequent getsockname. On Python 2, getsockname returns 623 # "fe80:1%foo". Strip it off, since the ifindex field in the return value is 624 # what matters. 625 s.bind((self.lladdr, 4646, 0, self.ifindex)) 626 sockname = s.getsockname() 627 expected = self.lladdr 628 if "%" in sockname[0]: 629 expected += "%" + self.ifname 630 self.assertEqual((expected, 4646, 0, self.ifindex), sockname) 631 632 # Of course, for the above to work the address actually has to be configured 633 # on the machine. 634 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 635 s.bind, ("fe80::f00", 1026, 0, 1)) 636 s.close() 637 638 # Scope IDs on non-link-local addresses are silently ignored. 639 s = net_test.IPv6PingSocket() 640 s.bind(("::1", 1234, 0, 1)) 641 self.assertEqual(("::1", 1234, 0, 0), s.getsockname()) 642 s.close() 643 644 def testBindAffectsIdentifier(self): 645 s = net_test.IPv6PingSocket() 646 s.bind((self.globaladdr, 0xf976)) 647 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 648 self.assertEqual(b"\xf9\x76", s.recv(32768)[4:6]) 649 s.close() 650 651 s = net_test.IPv6PingSocket() 652 s.bind((self.globaladdr, 0xace)) 653 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 654 self.assertEqual(b"\x0a\xce", s.recv(32768)[4:6]) 655 s.close() 656 657 def testLinkLocalAddress(self): 658 s = net_test.IPv6PingSocket() 659 # Sending to a link-local address with no scope fails with EINVAL. 660 self.assertRaisesErrno(errno.EINVAL, 661 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 662 # Sending to link-local address with a scope succeeds. Note that Python 663 # doesn't understand the "fe80::1%lo" format, even though it returns it. 664 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 665 # No exceptions? Good. 666 s.close() 667 668 def testLinkLocalOif(self): 669 """Checks that ping to link-local addresses works correctly. 670 671 Relevant kernel commits: 672 upstream net: 673 5e45789 net: ipv6: Fix ping to link-local addresses. 674 """ 675 for mode in ["oif", "ucast_oif", None]: 676 s = net_test.IPv6PingSocket() 677 for netid in self.NETIDS: 678 s2 = net_test.IPv6PingSocket() 679 dst = self._RouterAddress(netid, 6) 680 self.assertTrue(dst.startswith("fe80:")) 681 682 if mode: 683 self.SelectInterface(s, netid, mode) 684 self.SelectInterface(s2, netid, mode) 685 scopeid = 0 686 else: 687 scopeid = self.ifindices[netid] 688 689 if mode == "oif": 690 # If SO_BINDTODEVICE has been set, any attempt to send on another 691 # interface returns EINVAL. 692 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 693 % len(self.NETIDS)] 694 otherscopeid = self.ifindices[othernetid] 695 self.assertRaisesErrno( 696 errno.EINVAL, 697 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 698 self.assertRaisesErrno( 699 errno.EINVAL, 700 s.connect, (dst, 55, 0, otherscopeid)) 701 702 # Try using both sendto and connect/send. 703 # If we get a reply, we sent the packet out on the right interface. 704 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 705 self.assertValidPingResponse(s, net_test.IPV6_PING) 706 707 # IPV6_UNICAST_IF doesn't work on connected sockets. 708 if mode != "ucast_oif": 709 s2.connect((dst, 123, 0, scopeid)) 710 s2.send(net_test.IPV6_PING) 711 self.assertValidPingResponse(s2, net_test.IPV6_PING) 712 s2.close() 713 s.close() 714 715 def testMappedAddressFails(self): 716 s = net_test.IPv6PingSocket() 717 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 718 self.assertValidPingResponse(s, net_test.IPV6_PING) 719 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 720 self.assertValidPingResponse(s, net_test.IPV6_PING) 721 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 722 ("::ffff:192.0.2.1", 55)) 723 s.close() 724 725 @unittest.skipUnless(False, "skipping: does not work yet") 726 def testFlowLabel(self): 727 s = net_test.IPv6PingSocket() 728 729 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 730 # the flow label in the packet is not set. 731 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 732 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 733 734 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 735 # that is not registered with the flow manager should return EINVAL... 736 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 737 # ... but this doesn't work yet. 738 if False: # pylint: disable=using-constant-test 739 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 740 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 741 742 # After registering the flow label, it gets sent properly, appears in the 743 # output packet, and is returned in the response. 744 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 745 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 746 net_test.IPV6_FLOWINFO_SEND)) 747 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 748 _, src = s.recvfrom(32768) 749 _, _, flowlabel, _ = src 750 self.assertEqual(0xdead, flowlabel & 0xfffff) 751 s.close() 752 753 def testIPv4Error(self): 754 s = net_test.IPv4PingSocket() 755 s.setsockopt(SOL_IP, IP_TTL, 2) 756 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 757 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 758 # We can't check the actual error because Python 2.7 doesn't implement 759 # recvmsg, but we can at least check that the socket returns an error. 760 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 761 s.close() 762 763 def testIPv6Error(self): 764 s = net_test.IPv6PingSocket() 765 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 766 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 767 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 768 # We can't check the actual error because Python 2.7 doesn't implement 769 # recvmsg, but we can at least check that the socket returns an error. 770 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 771 s.close() 772 773 def testIPv6MulticastPing(self): 774 s = net_test.IPv6PingSocket() 775 # Send a multicast ping and check we get at least one duplicate. 776 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 777 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 778 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 779 self.assertValidPingResponse(s, net_test.IPV6_PING) 780 self.assertValidPingResponse(s, net_test.IPV6_PING) 781 s.close() 782 783 def testIPv4LargePacket(self): 784 s = net_test.IPv4PingSocket() 785 data = net_test.IPV4_PING + 20000 * b"a" 786 s.sendto(data, ("127.0.0.1", 987)) 787 self.assertValidPingResponse(s, data) 788 s.close() 789 790 def testIPv6LargePacket(self): 791 s = net_test.IPv6PingSocket() 792 s.bind(("::", 0xace)) 793 data = net_test.IPV6_PING + b"\x01" + 19994 * b"\x00" + b"aaaaa" 794 s.sendto(data, ("::1", 953)) 795 s.close() 796 797 def testIcmpSocketsNotInIcmp6(self): 798 numrows = len(self.ReadProcNetSocket("icmp")) 799 numrows6 = len(self.ReadProcNetSocket("icmp6")) 800 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 801 s.bind(("127.0.0.1", 0xace)) 802 s.connect(("127.0.0.1", 0xbeef)) 803 self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 804 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 805 s.close() 806 807 def testIcmp6SocketsNotInIcmp(self): 808 numrows = len(self.ReadProcNetSocket("icmp")) 809 numrows6 = len(self.ReadProcNetSocket("icmp6")) 810 s = net_test.IPv6PingSocket() 811 s.bind(("::1", 0xace)) 812 s.connect(("::1", 0xbeef)) 813 self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp"))) 814 self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 815 s.close() 816 817 def testProcNetIcmp(self): 818 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 819 s.bind(("127.0.0.1", 0xace)) 820 s.connect(("127.0.0.1", 0xbeef)) 821 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 822 s.close() 823 824 def testProcNetIcmp6(self): 825 numrows6 = len(self.ReadProcNetSocket("icmp6")) 826 s = net_test.IPv6PingSocket() 827 s.bind(("::1", 0xace)) 828 s.connect(("::1", 0xbeef)) 829 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 830 831 # Check the row goes away when the socket is closed. 832 s.close() 833 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 834 835 # Try send, bind and connect to check the addresses and the state. 836 s = net_test.IPv6PingSocket() 837 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 838 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 839 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 840 s.close() 841 842 # Can't bind after sendto, apparently. 843 s = net_test.IPv6PingSocket() 844 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 845 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 846 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 847 848 # Check receive bytes. 849 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 850 s.connect(("ff02::1", 0xdead)) 851 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 852 s.send(net_test.IPV6_PING) 853 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 854 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 855 txmem=0, rxmem=0x300) 856 self.assertValidPingResponse(s, net_test.IPV6_PING) 857 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 858 txmem=0, rxmem=0) 859 s.close() 860 861 def testProcNetUdp6(self): 862 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 863 s.bind(("::1", 0xace)) 864 s.connect(("::1", 0xbeef)) 865 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 866 s.close() 867 868 def testProcNetRaw6(self): 869 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 870 s.bind(("::1", 0xace)) 871 s.connect(("::1", 0xbeef)) 872 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 873 s.close() 874 875 def testIPv6MTU(self): 876 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 877 878 Relevant kernel commits: 879 upstream net-next: 880 dcb94b8 ipv6: fix endianness error in icmpv6_err 881 """ 882 s = net_test.IPv6PingSocket() 883 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 884 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 885 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 886 s.connect((net_test.IPV6_ADDR, 55)) 887 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * b"a" 888 s.send(pkt) 889 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 890 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 891 892 # Compare the offending packet with the one we sent. To do this we need to 893 # calculate the ident of the packet we sent and blank out the checksum of 894 # the one we received. 895 ident = struct.pack("!H", s.getsockname()[1]) 896 pkt = pkt[:4] + ident + pkt[6:] 897 data = data[:2] + b"\x00\x00" + pkt[4:] 898 self.assertEqual(pkt, data) 899 900 # Check the address that the packet was sent to. 901 self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 902 903 # Check the cmsg data, including the link MTU. 904 mtu = PingReplyThread.LINK_MTU 905 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 906 msglist = [ 907 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 908 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 909 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 910 csocket.Sockaddr((src, 0)))) 911 ] 912 913 self.assertEqual(msglist, cmsg) 914 s.close() 915 916 917if __name__ == "__main__": 918 unittest.main() 919