1#!/usr/bin/python3 2# 3# Copyright 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import multinetwork_base 27import net_test 28 29 30RTMGRP_NEIGH = 4 31 32NUD_INCOMPLETE = 0x01 33NUD_REACHABLE = 0x02 34NUD_STALE = 0x04 35NUD_DELAY = 0x08 36NUD_PROBE = 0x10 37NUD_FAILED = 0x20 38NUD_PERMANENT = 0x80 39 40 41# TODO: Support IPv4. 42class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): 43 44 # Set a 500-ms retrans timer so we can test for ND retransmits without 45 # waiting too long. Apparently this cannot go below 500ms. 46 RETRANS_TIME_MS = 500 47 48 # This can only be in seconds, so 1000 is the minimum. 49 DELAY_TIME_MS = 1000 50 51 # Unfortunately, this must be above the delay timer or the kernel ND code will 52 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is 53 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value 54 # that's 2x the delay timer. 55 BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS 56 MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS 57 58 # Kernel default unicast solicit is 3, but it need be changed larger 59 # when test recofiguration during probing 60 UCAST_SOLICIT_DEFAULT = 3 61 UCAST_SOLICIT_LARGE = 10 62 63 @classmethod 64 def setUpClass(cls): 65 super(NeighbourTest, cls).setUpClass() 66 for netid in cls.tuns: 67 iface = cls.GetInterfaceName(netid) 68 # This can't be set in an RA. 69 for proto in ["ipv4", "ipv6"]: 70 cls.SetSysctl( 71 "/proc/sys/net/%s/neigh/%s/delay_first_probe_time" % (proto, iface), 72 cls.DELAY_TIME_MS // 1000) 73 cls.SetSysctl( 74 "/proc/sys/net/%s/neigh/%s/retrans_time_ms" % (proto, iface), 75 cls.RETRANS_TIME_MS) 76 77 def setUp(self): 78 super(NeighbourTest, self).setUp() 79 80 for netid in self.tuns: 81 # Clear the ND cache entries for all routers, so each test starts with 82 # the IPv6 default router in state STALE. 83 addr = self._RouterAddress(netid, 6) 84 ifindex = self.ifindices[netid] 85 self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) 86 87 # Configure IPv6 by sending an RA. 88 self.SendRA(netid, 89 retranstimer=self.RETRANS_TIME_MS, 90 reachabletime=self.BASE_REACHABLE_TIME_MS) 91 92 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) 93 self.sock.bind((0, RTMGRP_NEIGH)) 94 net_test.SetNonBlocking(self.sock) 95 96 self.netid = random.choice(list(self.tuns.keys())) 97 self.ifindex = self.ifindices[self.netid] 98 99 # MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries. 100 # Temporarily change those entries to NUD_STALE so we can test them. 101 self.ChangeRouterNudState(4, NUD_STALE) 102 103 def SetUnicastSolicit(self, proto, iface, value): 104 self.SetSysctl( 105 "/proc/sys/net/%s/neigh/%s/ucast_solicit" % (proto, iface), value) 106 107 def tearDown(self): 108 super(NeighbourTest, self).tearDown() 109 # It is already reset to default by TearDownClass, 110 # but here we need to set it to default after each testcase. 111 iface = self.GetInterfaceName(self.netid) 112 for proto in ["ipv4", "ipv6"]: 113 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT) 114 115 # Change router ARP entries back to NUD_PERMANENT, 116 # so as not to affect other tests. 117 self.ChangeRouterNudState(4, NUD_PERMANENT) 118 119 self.sock.close() 120 self.sock = None 121 122 def ChangeRouterNudState(self, version, state): 123 router = self._RouterAddress(self.netid, version) 124 macaddr = self.RouterMacAddress(self.netid) 125 self.iproute.UpdateNeighbour(version, router, macaddr, self.ifindex, state) 126 self.ExpectNeighbourNotification(router, state) 127 self.assertNeighbourState(state, router) 128 129 def GetNeighbour(self, addr, ifindex): 130 version = csocket.AddressVersion(addr) 131 for msg, args in self.iproute.DumpNeighbours(version, ifindex): 132 if args["NDA_DST"] == addr: 133 return msg, args 134 135 def GetNdEntry(self, addr): 136 return self.GetNeighbour(addr, self.ifindex) 137 138 def CheckNoNdEvents(self): 139 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) 140 141 def assertNeighbourState(self, state, addr): 142 self.assertEqual(state, self.GetNdEntry(addr)[0].state) 143 144 def assertNeighbourAttr(self, addr, name, value): 145 self.assertEqual(value, self.GetNdEntry(addr)[1][name]) 146 147 def ExpectNeighbourNotification(self, addr, state, attrs=None): 148 msg = self.sock.recv(4096) 149 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) 150 self.assertEqual(addr, actual_attrs["NDA_DST"]) 151 self.assertEqual(state, msg.state) 152 if attrs: 153 for name in attrs: 154 self.assertEqual(attrs[name], actual_attrs[name]) 155 156 def ExpectProbe(self, is_unicast, addr): 157 version = csocket.AddressVersion(addr) 158 llsrc = self.MyMacAddress(self.netid) 159 if version == 6: 160 if is_unicast: 161 src = self.MyLinkLocalAddress(self.netid) 162 dst = addr 163 else: 164 solicited = inet_pton(AF_INET6, addr) 165 last3bytes = tuple([net_test.ByteToHex(b) for b in solicited[-3:]]) 166 dst = "ff02::1:ff%s:%s%s" % last3bytes 167 src = self.MyAddress(6, self.netid) 168 expected = ( 169 scapy.IPv6(src=src, dst=dst) / 170 scapy.ICMPv6ND_NS(tgt=addr) / 171 scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) 172 ) 173 msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") 174 self.ExpectPacketOn(self.netid, msg, expected) 175 else: # version == 4 176 if is_unicast: 177 src = self._MyIPv4Address(self.netid) 178 dst = addr 179 else: 180 raise NotImplementedError("This test does not support broadcast ARP") 181 expected = scapy.ARP(psrc=src, pdst=dst, hwsrc=llsrc, op=1) 182 msg = "Unicast ARP probe" 183 self.ExpectPacketOn(self.netid, msg, expected) 184 185 def ExpectUnicastProbe(self, addr): 186 self.ExpectProbe(True, addr) 187 188 def ExpectMulticastNS(self, addr): 189 self.ExpectProbe(False, addr) 190 191 def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, 192 S=1, O=0, R=1): 193 version = csocket.AddressVersion(addr) 194 if srcaddr is None: 195 srcaddr = addr 196 if dstaddr is None: 197 dstaddr = self.MyLinkLocalAddress(self.netid) 198 if version == 6: 199 packet = ( 200 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / 201 scapy.IPv6(src=srcaddr, dst=dstaddr) / 202 scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / 203 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) 204 ) 205 self.ReceiveEtherPacketOn(self.netid, packet) 206 else: 207 raise NotImplementedError 208 209 def SendDnsRequest(self, addr): 210 version = csocket.AddressVersion(addr) 211 routing_mode = random.choice(["mark", "oif", "uid"]) 212 s = self.BuildSocket(version, net_test.UDPSocket, self.netid, routing_mode) 213 s.connect((addr, 53)) 214 s.send(net_test.UDP_PAYLOAD) 215 return s 216 217 def MonitorSleepMs(self, interval, addr): 218 slept = 0 219 while slept < interval: 220 sleep_ms = min(100, interval - slept) 221 time.sleep(sleep_ms / 1000.0) 222 slept += sleep_ms 223 print(self.GetNdEntry(addr)) 224 225 def MonitorSleep(self, intervalseconds, addr): 226 self.MonitorSleepMs(intervalseconds * 1000, addr) 227 228 def SleepMs(self, ms): 229 time.sleep(ms / 1000.0) 230 231 def testNotifications(self): 232 """Tests neighbour notifications. 233 234 Relevant kernel commits: 235 upstream net-next: 236 765c9c6 neigh: Better handling of transition to NUD_PROBE state 237 53385d2 neigh: Netlink notification for administrative NUD state change 238 (only checked on kernel v3.13+, not on v3.10) 239 240 android-3.10: 241 e4a6d6b neigh: Better handling of transition to NUD_PROBE state 242 243 android-3.18: 244 2011e72 neigh: Better handling of transition to NUD_PROBE state 245 """ 246 router4 = self._RouterAddress(self.netid, 4) 247 router6 = self._RouterAddress(self.netid, 6) 248 self.assertNeighbourState(NUD_STALE, router4) 249 self.assertNeighbourState(NUD_STALE, router6) 250 251 # Send a packet and check that we go into DELAY. 252 s = self.SendDnsRequest(net_test.IPV6_ADDR) 253 self.assertNeighbourState(NUD_DELAY, router6) 254 255 # Wait for the probe interval, then check that we're in PROBE, and that the 256 # kernel has notified us. 257 self.SleepMs(self.DELAY_TIME_MS * 1.1) 258 self.ExpectNeighbourNotification(router6, NUD_PROBE) 259 self.assertNeighbourState(NUD_PROBE, router6) 260 self.ExpectUnicastProbe(router6) 261 262 # Respond to the NS and verify we're in REACHABLE again. 263 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid)) 264 self.assertNeighbourState(NUD_REACHABLE, router6) 265 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 266 267 # Wait until the reachable time has passed, and verify we're in STALE. 268 self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2) 269 self.assertNeighbourState(NUD_STALE, router6) 270 self.ExpectNeighbourNotification(router6, NUD_STALE) 271 272 # Send a packet, and verify we go into DELAY and then to PROBE. 273 s.send(net_test.UDP_PAYLOAD) 274 s.close() 275 self.assertNeighbourState(NUD_DELAY, router6) 276 self.SleepMs(self.DELAY_TIME_MS * 1.1) 277 self.assertNeighbourState(NUD_PROBE, router6) 278 self.ExpectNeighbourNotification(router6, NUD_PROBE) 279 280 # Wait for the probes to time out, and expect a FAILED notification. 281 self.assertNeighbourAttr(router6, "NDA_PROBES", 1) 282 self.ExpectUnicastProbe(router6) 283 284 self.SleepMs(self.RETRANS_TIME_MS) 285 self.ExpectUnicastProbe(router6) 286 self.assertNeighbourAttr(router6, "NDA_PROBES", 2) 287 288 self.SleepMs(self.RETRANS_TIME_MS) 289 self.ExpectUnicastProbe(router6) 290 self.assertNeighbourAttr(router6, "NDA_PROBES", 3) 291 292 self.SleepMs(self.RETRANS_TIME_MS) 293 self.assertNeighbourState(NUD_FAILED, router6) 294 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3}) 295 296 def testRepeatedProbes(self): 297 router4 = self._RouterAddress(self.netid, 4) 298 router6 = self._RouterAddress(self.netid, 6) 299 routermac = self.RouterMacAddress(self.netid) 300 self.assertNeighbourState(NUD_STALE, router4) 301 self.assertNeighbourState(NUD_STALE, router6) 302 303 def ForceProbe(addr, mac): 304 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE) 305 self.assertNeighbourState(NUD_PROBE, addr) 306 self.ExpectNeighbourNotification(addr, NUD_PROBE) 307 self.SleepMs(1) # TODO: Why is this necessary? 308 self.assertNeighbourState(NUD_PROBE, addr) 309 self.ExpectUnicastProbe(addr) 310 self.ReceiveUnicastAdvertisement(addr, mac) 311 self.assertNeighbourState(NUD_REACHABLE, addr) 312 self.ExpectNeighbourNotification(addr, NUD_REACHABLE) 313 314 for _ in range(5): 315 ForceProbe(router6, routermac) 316 317 def testIsRouterFlag(self): 318 router6 = self._RouterAddress(self.netid, 6) 319 self.assertNeighbourState(NUD_STALE, router6) 320 321 # Get into FAILED. 322 ifindex = self.ifindices[self.netid] 323 self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED) 324 self.ExpectNeighbourNotification(router6, NUD_FAILED) 325 self.assertNeighbourState(NUD_FAILED, router6) 326 327 time.sleep(1) 328 329 # Send another packet and expect a multicast NS. 330 self.SendDnsRequest(net_test.IPV6_ADDR).close() 331 self.ExpectMulticastNS(router6) 332 333 # Receive a unicast NA with the R flag set to 0. 334 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid), 335 srcaddr=self._RouterAddress(self.netid, 6), 336 dstaddr=self.MyAddress(6, self.netid), 337 S=1, O=0, R=0) 338 339 # Expect that this takes us to REACHABLE. 340 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 341 self.assertNeighbourState(NUD_REACHABLE, router6) 342 343 def DoReconfigureDuringProbing(self, version): 344 if version == 6: 345 proto = "ipv6" 346 ip_addr = net_test.IPV6_ADDR 347 else: 348 proto = "ipv4" 349 ip_addr = net_test.IPV4_ADDR 350 router = self._RouterAddress(self.netid, version) 351 self.assertNeighbourState(NUD_STALE, router) 352 353 iface = self.GetInterfaceName(self.netid) 354 # set unicast solicit larger. 355 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_LARGE) 356 357 # Send a packet and check that we go into DELAY. 358 self.SendDnsRequest(ip_addr).close() 359 self.assertNeighbourState(NUD_DELAY, router) 360 361 # Probing 4 times but no reponse 362 self.SleepMs(self.DELAY_TIME_MS * 1.1) 363 self.ExpectNeighbourNotification(router, NUD_PROBE) 364 self.assertNeighbourState(NUD_PROBE, router) 365 self.ExpectUnicastProbe(router) 366 367 for i in range(0, 3): 368 self.SleepMs(self.RETRANS_TIME_MS) 369 self.ExpectUnicastProbe(router) 370 371 # reconfiguration to 3 while probing and the state change to NUD_FAILED 372 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT) 373 self.SleepMs(self.RETRANS_TIME_MS) 374 self.ExpectNeighbourNotification(router, NUD_FAILED) 375 self.assertNeighbourState(NUD_FAILED, router) 376 377 # Check neighbor state after re-config ARP probe times. 378 def testReconfigureDuringProbing(self): 379 self.DoReconfigureDuringProbing(4) 380 self.DoReconfigureDuringProbing(6) 381 382if __name__ == "__main__": 383 unittest.main() 384