1#!/usr/bin/python3 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import iproute 27import multinetwork_base 28import packets 29import net_test 30 31# Setsockopt values. 32IPV6_ADDR_PREFERENCES = 72 33IPV6_PREFER_SRC_PUBLIC = 0x0002 34 35# The retrans timer is also the DAD timeout. We set this to a value that's not 36# so short that DAD will complete before we attempt to use the network, but 37# short enough that we don't have to wait too long for DAD to complete. 38RETRANS_TIMER = 150 39 40 41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): 42 """Test for IPv6 source address selection. 43 44 Relevant kernel commits: 45 upstream net-next: 46 7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 47 c58da4c net: ipv6: allow explicitly choosing optimistic addresses 48 9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface. 49 c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr(). 50 c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr(). 51 3985e8a ipv6: sysctl to restrict candidate source addresses 52 53 android-3.10: 54 2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 55 0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic 56 0633924 ipv6: sysctl to restrict candidate source addresses 57 """ 58 59 def SetIPv6Sysctl(self, ifname, sysctl, value): 60 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value) 61 62 def SetDAD(self, ifname, value): 63 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) 64 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) 65 66 def SetOptimisticDAD(self, ifname, value): 67 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) 68 69 def SetUseTempaddrs(self, ifname, value): 70 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) 71 72 def SetUseOptimistic(self, ifname, value): 73 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) 74 75 def SetForwarding(self, value): 76 self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value) 77 78 def GetSourceIP(self, netid, mode="mark"): 79 s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) 80 # Because why not...testing for temporary addresses is a separate thing. 81 s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) 82 83 s.connect((net_test.IPV6_ADDR, 123)) 84 src_addr = s.getsockname()[0] 85 self.assertTrue(src_addr) 86 s.close() 87 return src_addr 88 89 def assertAddressNotPresent(self, address): 90 self.assertRaises(IOError, self.iproute.GetAddress, address) 91 92 def assertAddressHasExpectedAttributes( 93 self, address, expected_ifindex, expected_flags): 94 ifa_msg = self.iproute.GetAddress(address)[0] 95 self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) 96 self.assertEqual(64, ifa_msg.prefixlen) 97 self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) 98 self.assertEqual(expected_ifindex, ifa_msg.index) 99 self.assertEqual(expected_flags, ifa_msg.flags & expected_flags) 100 101 def AddressIsTentative(self, address): 102 ifa_msg = self.iproute.GetAddress(address)[0] 103 return ifa_msg.flags & iproute.IFA_F_TENTATIVE 104 105 def BindToAddress(self, address): 106 s = net_test.UDPSocket(AF_INET6) 107 try: 108 s.bind((address, 0, 0, 0)) 109 finally: 110 s.close() 111 112 def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): 113 pktinfo = multinetwork_base.MakePktInfo(6, address, 0) 114 cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] 115 s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") 116 try: 117 return csocket.Sendmsg(s, (dest, 53), b"Hello", cmsgs, 0) 118 finally: 119 s.close() 120 121 def assertAddressUsable(self, address, netid): 122 self.BindToAddress(address) 123 self.SendWithSourceAddress(address, netid) 124 # No exceptions? Good. 125 126 def assertAddressNotUsable(self, address, netid): 127 self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) 128 self.assertRaisesErrno(errno.EINVAL, 129 self.SendWithSourceAddress, address, netid) 130 131 def assertAddressSelected(self, address, netid): 132 self.assertEqual(address, self.GetSourceIP(netid)) 133 134 def assertAddressNotSelected(self, address, netid): 135 self.assertNotEqual(address, self.GetSourceIP(netid)) 136 137 def WaitForDad(self, address): 138 for _ in range(20): 139 if not self.AddressIsTentative(address): 140 return 141 time.sleep(0.1) 142 raise AssertionError(f"{address} did not complete DAD after 2 seconds") 143 144 def WaitForDadFailure(self, address): 145 # Address should be either deleted or set IFA_F_DADFAILED flag after DAD failure 146 for _ in range(20): 147 try: 148 ifa_msg = self.iproute.GetAddress(address)[0] 149 except OSError: 150 return 151 if ifa_msg.flags & iproute.IFA_F_DADFAILED: 152 return 153 time.sleep(0.1) 154 raise AssertionError(f"{address} did not complete DAD failure after 2 seconds") 155 156 157class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): 158 159 def setUp(self): 160 # [0] Make sure DAD, optimistic DAD, and the use_optimistic option 161 # are all consistently disabled at the outset. 162 for netid in self.tuns: 163 ifname = self.GetInterfaceName(netid) 164 self.SetDAD(ifname, 0) 165 self.SetOptimisticDAD(ifname, 0) 166 self.SetUseTempaddrs(ifname, 0) 167 self.SetUseOptimistic(ifname, 0) 168 self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) 169 170 # [1] Pick an interface on which to test. 171 self.test_netid = random.choice(list(self.tuns.keys())) 172 self.test_ip = self.MyAddress(6, self.test_netid) 173 self.test_ifindex = self.ifindices[self.test_netid] 174 self.test_ifname = self.GetInterfaceName(self.test_netid) 175 self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True) 176 177 # [2] Delete the test interface's IPv6 address. 178 self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) 179 self.assertAddressNotPresent(self.test_ip) 180 181 self.assertAddressNotUsable(self.test_ip, self.test_netid) 182 # Verify that the link-local address is not tentative. 183 # Even though we disable DAD above, without this change occasionally the 184 # test fails. This might be due to us disabling DAD only after the 185 # link-local address is generated. 186 self.WaitForDad(self.test_lladdr) 187 188 # Disable forwarding, because optimistic addresses don't work when 189 # forwarding is on. Forwarding will be re-enabled when the sysctls are 190 # restored by MultiNetworkBaseTest.tearDownClass. 191 # TODO: Fix this and remove this hack. 192 self.SetForwarding("0") 193 194 195class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): 196 197 def testRfc6724Behaviour(self): 198 # [3] Get an IPv6 address back, in DAD start-up. 199 self.SetDAD(self.test_ifname, 1) # Enable DAD 200 # Send a RA to start SLAAC and subsequent DAD. 201 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 202 # Get flags and prove tentative-ness. 203 self.assertAddressHasExpectedAttributes( 204 self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) 205 206 # Even though the interface has an IPv6 address, its tentative nature 207 # prevents it from being selected. 208 self.assertAddressNotUsable(self.test_ip, self.test_netid) 209 self.assertAddressNotSelected(self.test_ip, self.test_netid) 210 211 # Busy wait for DAD to complete (should be less than 1 second). 212 self.WaitForDad(self.test_ip) 213 214 # The test_ip should have completed DAD by now, and should be the 215 # chosen source address, eligible to bind to, etc. 216 self.assertAddressUsable(self.test_ip, self.test_netid) 217 self.assertAddressSelected(self.test_ip, self.test_netid) 218 219 220class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): 221 222 def testRfc6724Behaviour(self): 223 # [3] Get an IPv6 address back, in optimistic DAD start-up. 224 self.SetDAD(self.test_ifname, 1) # Enable DAD 225 self.SetOptimisticDAD(self.test_ifname, 1) 226 # Send a RA to start SLAAC and subsequent DAD. 227 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 228 # Get flags and prove optimism. 229 self.assertAddressHasExpectedAttributes( 230 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 231 232 # Optimistic addresses are usable but are not selected. 233 self.assertAddressUsable(self.test_ip, self.test_netid) 234 self.assertAddressNotSelected(self.test_ip, self.test_netid) 235 236 # Busy wait for DAD to complete (should be less than 1 second). 237 self.WaitForDad(self.test_ip) 238 239 # The test_ip should have completed DAD by now, and should be the 240 # chosen source address. 241 self.assertAddressUsable(self.test_ip, self.test_netid) 242 self.assertAddressSelected(self.test_ip, self.test_netid) 243 244 245class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): 246 247 def testModifiedRfc6724Behaviour(self): 248 # [3] Get an IPv6 address back, in optimistic DAD start-up. 249 self.SetDAD(self.test_ifname, 1) # Enable DAD 250 self.SetOptimisticDAD(self.test_ifname, 1) 251 self.SetUseOptimistic(self.test_ifname, 1) 252 # Send a RA to start SLAAC and subsequent DAD. 253 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 254 # Get flags and prove optimistism. 255 self.assertAddressHasExpectedAttributes( 256 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 257 258 # The interface has an IPv6 address and, despite its optimistic nature, 259 # the use_optimistic option allows it to be selected. 260 self.assertAddressUsable(self.test_ip, self.test_netid) 261 self.assertAddressSelected(self.test_ip, self.test_netid) 262 263 264class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 265 266 def testModifiedRfc6724Behaviour(self): 267 # [3] Add a valid IPv6 address to this interface and verify it is 268 # selected as the source address. 269 preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe" 270 self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) 271 self.assertAddressHasExpectedAttributes( 272 preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) 273 self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid)) 274 275 # [4] Get another IPv6 address, in optimistic DAD start-up. 276 self.SetDAD(self.test_ifname, 1) # Enable DAD 277 self.SetOptimisticDAD(self.test_ifname, 1) 278 self.SetUseOptimistic(self.test_ifname, 1) 279 # Send a RA to start SLAAC and subsequent DAD. 280 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 281 # Get flags and prove optimism. 282 self.assertAddressHasExpectedAttributes( 283 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 284 285 # Since the interface has another IPv6 address, the optimistic address 286 # is not selected--the other, valid address is chosen. 287 self.assertAddressUsable(self.test_ip, self.test_netid) 288 self.assertAddressNotSelected(self.test_ip, self.test_netid) 289 self.assertAddressSelected(preferred_ip, self.test_netid) 290 291 292class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): 293 294 def testDadFailure(self): 295 # [3] Get an IPv6 address back, in optimistic DAD start-up. 296 self.SetDAD(self.test_ifname, 1) # Enable DAD 297 self.SetOptimisticDAD(self.test_ifname, 1) 298 self.SetUseOptimistic(self.test_ifname, 1) 299 # Send a RA to start SLAAC and subsequent DAD. 300 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 301 time.sleep(0.1) # Give the kernel time to notice our RA 302 # Prove optimism and usability. 303 self.assertAddressHasExpectedAttributes( 304 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 305 self.assertAddressUsable(self.test_ip, self.test_netid) 306 self.assertAddressSelected(self.test_ip, self.test_netid) 307 308 # Send a NA for the optimistic address, indicating address conflict 309 # ("DAD defense"). 310 conflict_macaddr = "02:00:0b:ad:d0:0d" 311 dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / 312 scapy.IPv6(src=self.test_ip, dst="ff02::1") / 313 scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / 314 scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) 315 self.ReceiveEtherPacketOn(self.test_netid, dad_defense) 316 self.WaitForDadFailure(self.test_ip) 317 318 # The address should have failed DAD, and therefore no longer be usable. 319 self.assertAddressNotUsable(self.test_ip, self.test_netid) 320 self.assertAddressNotSelected(self.test_ip, self.test_netid) 321 322 # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. 323 324 325class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 326 327 def testSendToOnlinkDestination(self): 328 # [3] Get an IPv6 address back, in optimistic DAD start-up. 329 self.SetDAD(self.test_ifname, 1) # Enable DAD 330 self.SetOptimisticDAD(self.test_ifname, 1) 331 self.SetUseOptimistic(self.test_ifname, 1) 332 # Send a RA to start SLAAC and subsequent DAD. 333 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 334 # Prove optimism and usability. 335 self.assertAddressHasExpectedAttributes( 336 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 337 self.assertAddressUsable(self.test_ip, self.test_netid) 338 self.assertAddressSelected(self.test_ip, self.test_netid) 339 340 # [4] Send to an on-link destination and observe a Neighbor Solicitation 341 # packet with a source address that is NOT the optimistic address. 342 # In this setup, the only usable address is the link-local address. 343 onlink_dest = self.GetRandomDestination( 344 self.OnlinkPrefix(6, self.test_netid)) 345 self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) 346 347 expected_ns = packets.NS( 348 self.test_lladdr, 349 onlink_dest, 350 self.MyMacAddress(self.test_netid))[1] 351 self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) 352 353 354# TODO(ek): add tests listening for netlink events. 355 356 357class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 358 359 def testChoosesNonInterfaceSourceAddress(self): 360 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0) 361 src_ip = self.GetSourceIP(self.test_netid) 362 self.assertFalse(src_ip in [self.test_ip, self.test_lladdr]) 363 self.assertTrue(src_ip in 364 [self.MyAddress(6, netid) 365 for netid in self.tuns if netid != self.test_netid]) 366 367 368class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 369 370 def testChoosesOnlyInterfaceSourceAddress(self): 371 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1) 372 # self.test_ifname does not have a global IPv6 address, so the only 373 # candidate is the existing link-local address. 374 self.assertAddressSelected(self.test_lladdr, self.test_netid) 375 376 377if __name__ == "__main__": 378 unittest.main() 379