1#!/usr/bin/python3
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import time
21import unittest
22
23from scapy import all as scapy
24
25import csocket
26import iproute
27import multinetwork_base
28import packets
29import net_test
30
31# Setsockopt values.
32IPV6_ADDR_PREFERENCES = 72
33IPV6_PREFER_SRC_PUBLIC = 0x0002
34
35# The retrans timer is also the DAD timeout. We set this to a value that's not
36# so short that DAD will complete before we attempt to use the network, but
37# short enough that we don't have to wait too long for DAD to complete.
38RETRANS_TIMER = 150
39
40
41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
42  """Test for IPv6 source address selection.
43
44  Relevant kernel commits:
45    upstream net-next:
46      7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
47      c58da4c net: ipv6: allow explicitly choosing optimistic addresses
48      9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
49      c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
50      c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
51      3985e8a ipv6: sysctl to restrict candidate source addresses
52
53    android-3.10:
54      2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
55      0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
56      0633924 ipv6: sysctl to restrict candidate source addresses
57  """
58
59  def SetIPv6Sysctl(self, ifname, sysctl, value):
60    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
61
62  def SetDAD(self, ifname, value):
63    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
64    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
65
66  def SetOptimisticDAD(self, ifname, value):
67    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
68
69  def SetUseTempaddrs(self, ifname, value):
70    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
71
72  def SetUseOptimistic(self, ifname, value):
73    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
74
75  def SetForwarding(self, value):
76    self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value)
77
78  def GetSourceIP(self, netid, mode="mark"):
79    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
80    # Because why not...testing for temporary addresses is a separate thing.
81    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
82
83    s.connect((net_test.IPV6_ADDR, 123))
84    src_addr = s.getsockname()[0]
85    self.assertTrue(src_addr)
86    s.close()
87    return src_addr
88
89  def assertAddressNotPresent(self, address):
90    self.assertRaises(IOError, self.iproute.GetAddress, address)
91
92  def assertAddressHasExpectedAttributes(
93      self, address, expected_ifindex, expected_flags):
94    ifa_msg = self.iproute.GetAddress(address)[0]
95    self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
96    self.assertEqual(64, ifa_msg.prefixlen)
97    self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
98    self.assertEqual(expected_ifindex, ifa_msg.index)
99    self.assertEqual(expected_flags, ifa_msg.flags & expected_flags)
100
101  def AddressIsTentative(self, address):
102    ifa_msg = self.iproute.GetAddress(address)[0]
103    return ifa_msg.flags & iproute.IFA_F_TENTATIVE
104
105  def BindToAddress(self, address):
106    s = net_test.UDPSocket(AF_INET6)
107    try:
108      s.bind((address, 0, 0, 0))
109    finally:
110      s.close()
111
112  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
113    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
114    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
115    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
116    try:
117      return csocket.Sendmsg(s, (dest, 53), b"Hello", cmsgs, 0)
118    finally:
119      s.close()
120
121  def assertAddressUsable(self, address, netid):
122    self.BindToAddress(address)
123    self.SendWithSourceAddress(address, netid)
124    # No exceptions? Good.
125
126  def assertAddressNotUsable(self, address, netid):
127    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
128    self.assertRaisesErrno(errno.EINVAL,
129                           self.SendWithSourceAddress, address, netid)
130
131  def assertAddressSelected(self, address, netid):
132    self.assertEqual(address, self.GetSourceIP(netid))
133
134  def assertAddressNotSelected(self, address, netid):
135    self.assertNotEqual(address, self.GetSourceIP(netid))
136
137  def WaitForDad(self, address):
138    for _ in range(20):
139      if not self.AddressIsTentative(address):
140        return
141      time.sleep(0.1)
142    raise AssertionError(f"{address} did not complete DAD after 2 seconds")
143
144  def WaitForDadFailure(self, address):
145    # Address should be either deleted or set IFA_F_DADFAILED flag after DAD failure
146    for _ in range(20):
147      try:
148        ifa_msg = self.iproute.GetAddress(address)[0]
149      except OSError:
150        return
151      if ifa_msg.flags & iproute.IFA_F_DADFAILED:
152        return
153      time.sleep(0.1)
154    raise AssertionError(f"{address} did not complete DAD failure after 2 seconds")
155
156
157class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
158
159  def setUp(self):
160    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
161    # are all consistently disabled at the outset.
162    for netid in self.tuns:
163      ifname = self.GetInterfaceName(netid)
164      self.SetDAD(ifname, 0)
165      self.SetOptimisticDAD(ifname, 0)
166      self.SetUseTempaddrs(ifname, 0)
167      self.SetUseOptimistic(ifname, 0)
168      self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
169
170    # [1]  Pick an interface on which to test.
171    self.test_netid = random.choice(list(self.tuns.keys()))
172    self.test_ip = self.MyAddress(6, self.test_netid)
173    self.test_ifindex = self.ifindices[self.test_netid]
174    self.test_ifname = self.GetInterfaceName(self.test_netid)
175    self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
176
177    # [2]  Delete the test interface's IPv6 address.
178    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
179    self.assertAddressNotPresent(self.test_ip)
180
181    self.assertAddressNotUsable(self.test_ip, self.test_netid)
182    # Verify that the link-local address is not tentative.
183    # Even though we disable DAD above, without this change occasionally the
184    # test fails. This might be due to us disabling DAD only after the
185    # link-local address is generated.
186    self.WaitForDad(self.test_lladdr)
187
188    # Disable forwarding, because optimistic addresses don't work when
189    # forwarding is on. Forwarding will be re-enabled when the sysctls are
190    # restored by MultiNetworkBaseTest.tearDownClass.
191    # TODO: Fix this and remove this hack.
192    self.SetForwarding("0")
193
194
195class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
196
197  def testRfc6724Behaviour(self):
198    # [3]  Get an IPv6 address back, in DAD start-up.
199    self.SetDAD(self.test_ifname, 1)  # Enable DAD
200    # Send a RA to start SLAAC and subsequent DAD.
201    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
202    # Get flags and prove tentative-ness.
203    self.assertAddressHasExpectedAttributes(
204        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
205
206    # Even though the interface has an IPv6 address, its tentative nature
207    # prevents it from being selected.
208    self.assertAddressNotUsable(self.test_ip, self.test_netid)
209    self.assertAddressNotSelected(self.test_ip, self.test_netid)
210
211    # Busy wait for DAD to complete (should be less than 1 second).
212    self.WaitForDad(self.test_ip)
213
214    # The test_ip should have completed DAD by now, and should be the
215    # chosen source address, eligible to bind to, etc.
216    self.assertAddressUsable(self.test_ip, self.test_netid)
217    self.assertAddressSelected(self.test_ip, self.test_netid)
218
219
220class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
221
222  def testRfc6724Behaviour(self):
223    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
224    self.SetDAD(self.test_ifname, 1)  # Enable DAD
225    self.SetOptimisticDAD(self.test_ifname, 1)
226    # Send a RA to start SLAAC and subsequent DAD.
227    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
228    # Get flags and prove optimism.
229    self.assertAddressHasExpectedAttributes(
230        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
231
232    # Optimistic addresses are usable but are not selected.
233    self.assertAddressUsable(self.test_ip, self.test_netid)
234    self.assertAddressNotSelected(self.test_ip, self.test_netid)
235
236    # Busy wait for DAD to complete (should be less than 1 second).
237    self.WaitForDad(self.test_ip)
238
239    # The test_ip should have completed DAD by now, and should be the
240    # chosen source address.
241    self.assertAddressUsable(self.test_ip, self.test_netid)
242    self.assertAddressSelected(self.test_ip, self.test_netid)
243
244
245class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
246
247  def testModifiedRfc6724Behaviour(self):
248    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
249    self.SetDAD(self.test_ifname, 1)  # Enable DAD
250    self.SetOptimisticDAD(self.test_ifname, 1)
251    self.SetUseOptimistic(self.test_ifname, 1)
252    # Send a RA to start SLAAC and subsequent DAD.
253    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
254    # Get flags and prove optimistism.
255    self.assertAddressHasExpectedAttributes(
256        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
257
258    # The interface has an IPv6 address and, despite its optimistic nature,
259    # the use_optimistic option allows it to be selected.
260    self.assertAddressUsable(self.test_ip, self.test_netid)
261    self.assertAddressSelected(self.test_ip, self.test_netid)
262
263
264class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
265
266  def testModifiedRfc6724Behaviour(self):
267    # [3]  Add a valid IPv6 address to this interface and verify it is
268    # selected as the source address.
269    preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe"
270    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
271    self.assertAddressHasExpectedAttributes(
272        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
273    self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid))
274
275    # [4]  Get another IPv6 address, in optimistic DAD start-up.
276    self.SetDAD(self.test_ifname, 1)  # Enable DAD
277    self.SetOptimisticDAD(self.test_ifname, 1)
278    self.SetUseOptimistic(self.test_ifname, 1)
279    # Send a RA to start SLAAC and subsequent DAD.
280    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
281    # Get flags and prove optimism.
282    self.assertAddressHasExpectedAttributes(
283        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
284
285    # Since the interface has another IPv6 address, the optimistic address
286    # is not selected--the other, valid address is chosen.
287    self.assertAddressUsable(self.test_ip, self.test_netid)
288    self.assertAddressNotSelected(self.test_ip, self.test_netid)
289    self.assertAddressSelected(preferred_ip, self.test_netid)
290
291
292class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
293
294  def testDadFailure(self):
295    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
296    self.SetDAD(self.test_ifname, 1)  # Enable DAD
297    self.SetOptimisticDAD(self.test_ifname, 1)
298    self.SetUseOptimistic(self.test_ifname, 1)
299    # Send a RA to start SLAAC and subsequent DAD.
300    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
301    time.sleep(0.1) # Give the kernel time to notice our RA
302    # Prove optimism and usability.
303    self.assertAddressHasExpectedAttributes(
304        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
305    self.assertAddressUsable(self.test_ip, self.test_netid)
306    self.assertAddressSelected(self.test_ip, self.test_netid)
307
308    # Send a NA for the optimistic address, indicating address conflict
309    # ("DAD defense").
310    conflict_macaddr = "02:00:0b:ad:d0:0d"
311    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
312                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
313                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
314                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
315    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
316    self.WaitForDadFailure(self.test_ip)
317
318    # The address should have failed DAD, and therefore no longer be usable.
319    self.assertAddressNotUsable(self.test_ip, self.test_netid)
320    self.assertAddressNotSelected(self.test_ip, self.test_netid)
321
322    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
323
324
325class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
326
327  def testSendToOnlinkDestination(self):
328    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
329    self.SetDAD(self.test_ifname, 1)  # Enable DAD
330    self.SetOptimisticDAD(self.test_ifname, 1)
331    self.SetUseOptimistic(self.test_ifname, 1)
332    # Send a RA to start SLAAC and subsequent DAD.
333    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
334    # Prove optimism and usability.
335    self.assertAddressHasExpectedAttributes(
336        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
337    self.assertAddressUsable(self.test_ip, self.test_netid)
338    self.assertAddressSelected(self.test_ip, self.test_netid)
339
340    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
341    # packet with a source address that is NOT the optimistic address.
342    # In this setup, the only usable address is the link-local address.
343    onlink_dest = self.GetRandomDestination(
344        self.OnlinkPrefix(6, self.test_netid))
345    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
346
347    expected_ns = packets.NS(
348        self.test_lladdr,
349        onlink_dest,
350        self.MyMacAddress(self.test_netid))[1]
351    self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
352
353
354# TODO(ek): add tests listening for netlink events.
355
356
357class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
358
359  def testChoosesNonInterfaceSourceAddress(self):
360    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
361    src_ip = self.GetSourceIP(self.test_netid)
362    self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
363    self.assertTrue(src_ip in
364                    [self.MyAddress(6, netid)
365                     for netid in self.tuns if netid != self.test_netid])
366
367
368class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
369
370  def testChoosesOnlyInterfaceSourceAddress(self):
371    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
372    # self.test_ifname does not have a global IPv6 address, so the only
373    # candidate is the existing link-local address.
374    self.assertAddressSelected(self.test_lladdr, self.test_netid)
375
376
377if __name__ == "__main__":
378  unittest.main()
379