1#!/usr/bin/python3
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
18from errno import *  # pylint: disable=wildcard-import
19from socket import *  # pylint: disable=wildcard-import
20
21import random
22import itertools
23import struct
24import unittest
25
26from net_test import LINUX_VERSION
27from scapy import all as scapy
28from tun_twister import TunTwister
29import csocket
30import iproute
31import multinetwork_base
32import net_test
33import packets
34import util
35import xfrm
36import xfrm_base
37
38_LOOPBACK_IFINDEX = 1
39_TEST_XFRM_IFNAME = "ipsec42"
40_TEST_XFRM_IF_ID = 42
41_TEST_SPI = 0x1234
42
43# Two kernel fixes have been added in 5.17 to allow XFRM_MIGRATE to work correctly
44# when (1) there are multiple tunnels with the same selectors; and (2) addresses
45# are updated to a different IP family. These two fixes were pulled into upstream
46# LTS releases 4.14.273, 4.19.236, 5.4.186, 5.10.107 and 5.15.30, from whence they
47# flowed into the Android Common Kernel (via standard LTS merges).
48#
49# Note 'xfrm: Check if_id in xfrm_migrate' did not end up in 4.14 LTS,
50# and is only present in ACK android-4.14-stable after 4.14.320 LTS merge.
51# See https://android-review.git.corp.google.com/c/kernel/common/+/2640243
52#
53# As such we require 4.14.321+, 4.19.236+, 5.4.186+, 5.10.107+, 5.15.30+ or 5.17+
54# to have these fixes.
55def HasXfrmMigrateFixes():
56    return net_test.KernelAtLeast([(4, 19, 236), (5, 4, 186),
57                                   (5, 10, 107), (5, 15, 30)]) or net_test.NonGXI(4, 14)
58
59
60# Does the kernel support CONFIG_XFRM_MIGRATE and include the kernel fixes?
61def SupportsXfrmMigrate():
62  if not HasXfrmMigrateFixes():
63    return False
64
65  # 5.10+ must have CONFIG_XFRM_MIGRATE enabled
66  if LINUX_VERSION >= (5, 10, 0):
67    return True
68
69  try:
70    x = xfrm.Xfrm()
71    wildcard_addr = net_test.GetWildcardAddress(6)
72    selector = xfrm.EmptySelector(AF_INET6)
73
74    # Expect migration to fail with EINVAL because it is trying to migrate a
75    # non-existent SA.
76    x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr,
77                    wildcard_addr, wildcard_addr, _TEST_SPI,
78                    None, None, None, None, None, None)
79    print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled")
80    return True
81  except IOError as err:
82    if err.errno == ENOPROTOOPT:
83      return False
84    elif err.errno == EINVAL:
85      return True
86    else:
87      print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno)
88      return True
89
90SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate()
91
92# Parameters to setup tunnels as special networks
93_TUNNEL_NETID_OFFSET = 0xFC00  # Matches reserved netid range for IpSecService
94_BASE_TUNNEL_NETID = {4: 40, 6: 60}
95_BASE_VTI_OKEY = 2000000100
96_BASE_VTI_IKEY = 2000000200
97
98_TEST_OUT_SPI = _TEST_SPI
99_TEST_IN_SPI = _TEST_OUT_SPI
100
101_TEST_OKEY = 2000000100
102_TEST_IKEY = 2000000200
103
104_TEST_REMOTE_PORT = 1234
105
106_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
107
108
109def _GetLocalInnerAddress(version):
110  return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
111
112
113def _GetRemoteInnerAddress(version):
114  return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version]
115
116
117def _GetRemoteOuterAddress(version):
118  return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
119
120
121def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
122                                   src_port, dst_inner, dst_outer,
123                                   dst_port, spi, seq_num, ip_hdr_options=None):
124  if ip_hdr_options is None:
125    ip_hdr_options = {}
126
127  ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
128
129  # Build and receive an ESP packet destined for the inner socket
130  IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
131  input_pkt = (
132      IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
133      net_test.UDP_PAYLOAD)
134  input_pkt = IpType(bytes(input_pkt))  # Compute length, checksum.
135  input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
136                                              (src_outer, dst_outer))
137
138  return input_pkt
139
140
141def _CreateReceiveSock(version, port=0):
142  # Create a socket to receive packets.
143  read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
144  read_sock.bind((net_test.GetWildcardAddress(version), port))
145  # The second parameter of the tuple is the port number regardless of AF.
146  local_port = read_sock.getsockname()[1]
147  # Guard against the eventuality of the receive failing.
148  csocket.SetSocketTimeout(read_sock, 500)
149
150  return read_sock, local_port
151
152
153def _SendPacket(testInstance, netid, version, remote, remote_port):
154  # Send a packet out via the tunnel-backed network, bound for the port number
155  # of the input socket.
156  write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
157  testInstance.SelectInterface(write_sock, netid, "mark")
158  write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
159  local_port = write_sock.getsockname()[1]
160  write_sock.close()
161
162  return local_port
163
164
165def InjectTests():
166  InjectParameterizedTests(XfrmTunnelTest)
167  InjectParameterizedTests(XfrmInterfaceTest)
168  InjectParameterizedTests(XfrmVtiTest)
169  InjectParameterizedMigrateTests(XfrmInterfaceMigrateTest)
170
171
172def InjectParameterizedTests(cls):
173  VERSIONS = (4, 6)
174  param_list = itertools.product(VERSIONS, VERSIONS)
175
176  def NameGenerator(*args):
177    return "IPv%d_in_IPv%d" % tuple(args)
178
179  util.InjectParameterizedTest(cls, param_list, NameGenerator)
180
181def InjectParameterizedMigrateTests(cls):
182  VERSIONS = (4, 6)
183  param_list = itertools.product(VERSIONS, VERSIONS, VERSIONS)
184
185  def NameGenerator(*args):
186    return "IPv%d_in_IPv%d_to_outer_IPv%d" % tuple(args)
187
188  util.InjectParameterizedTest(cls, param_list, NameGenerator)
189
190
191class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
192
193  def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
194                         netid, local_inner, remote_inner, local_outer,
195                         remote_outer, write_sock):
196
197    write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
198    self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
199                            local_outer, remote_outer)
200
201  def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
202                        netid, local_inner, remote_inner, local_outer,
203                        remote_outer, read_sock):
204
205    # The second parameter of the tuple is the port number regardless of AF.
206    local_port = read_sock.getsockname()[1]
207
208    # Build and receive an ESP packet destined for the inner socket
209    input_pkt = _GetNullAuthCryptTunnelModePkt(
210        inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
211        local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
212    self.ReceivePacketOn(underlying_netid, input_pkt)
213
214    # Verify that the packet data and src are correct
215    data, src = read_sock.recvfrom(4096)
216    self.assertEqual(net_test.UDP_PAYLOAD, data)
217    self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
218
219  def _TestTunnel(self, inner_version, outer_version, func, direction,
220                  test_output_mark_unset):
221    """Test a unidirectional XFRM Tunnel with explicit selectors"""
222    # Select the underlying netid, which represents the external
223    # interface from/to which to route ESP packets.
224    u_netid = self.RandomNetid()
225    # Select a random netid that will originate traffic locally and
226    # which represents the netid on which the plaintext is sent
227    netid = self.RandomNetid(exclude=u_netid)
228
229    local_inner = self.MyAddress(inner_version, netid)
230    remote_inner = _GetRemoteInnerAddress(inner_version)
231    local_outer = self.MyAddress(outer_version, u_netid)
232    remote_outer = _GetRemoteOuterAddress(outer_version)
233
234    output_mark = u_netid
235    if test_output_mark_unset:
236      output_mark = None
237      self.SetDefaultNetwork(u_netid)
238
239    try:
240      # Create input/ouput SPs, SAs and sockets to simulate a more realistic
241      # environment.
242      self.xfrm.CreateTunnel(
243          xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner),
244          remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL,
245          xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL)
246
247      self.xfrm.CreateTunnel(
248          xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner),
249          local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256,
250          xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL)
251
252      write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
253      self.SelectInterface(write_sock, netid, "mark")
254      read_sock, _ = _CreateReceiveSock(inner_version)
255
256      sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
257      func(inner_version, outer_version, u_netid, netid, local_inner,
258          remote_inner, local_outer, remote_outer, sock)
259
260      write_sock.close()
261      read_sock.close()
262    finally:
263      if test_output_mark_unset:
264        self.ClearDefaultNetwork()
265
266  def ParamTestTunnelInput(self, inner_version, outer_version):
267    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
268                     xfrm.XFRM_POLICY_IN, False)
269
270  def ParamTestTunnelOutput(self, inner_version, outer_version):
271    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
272                     xfrm.XFRM_POLICY_OUT, False)
273
274  def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version):
275    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
276                     xfrm.XFRM_POLICY_OUT, True)
277
278
279class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
280  def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
281                         ikey, okey):
282    self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey)
283    self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey)
284
285    family = AF_INET if version == 4 else AF_INET6
286    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
287                      local_addr)
288    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
289                      remote_addr)
290
291  def testAddVti(self):
292    """Test the creation of a Virtual Tunnel Interface."""
293    for version in [4, 6]:
294      netid = self.RandomNetid()
295      local_addr = self.MyAddress(version, netid)
296      self.iproute.CreateVirtualTunnelInterface(
297          dev_name=_TEST_XFRM_IFNAME,
298          local_addr=local_addr,
299          remote_addr=_GetRemoteOuterAddress(version),
300          o_key=_TEST_OKEY,
301          i_key=_TEST_IKEY)
302      self._VerifyVtiInfoData(
303          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
304          _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
305
306      new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
307      new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
308      new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
309      self.iproute.CreateVirtualTunnelInterface(
310          dev_name=_TEST_XFRM_IFNAME,
311          local_addr=local_addr,
312          remote_addr=new_remote_addr[version],
313          o_key=new_okey,
314          i_key=new_ikey,
315          is_update=True)
316
317      self._VerifyVtiInfoData(
318          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
319          new_remote_addr[version], new_ikey, new_okey)
320
321      if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
322
323      # Validate that the netlink interface matches the ioctl interface.
324      self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
325      self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
326      with self.assertRaises(IOError):
327        self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
328
329  def _QuietDeleteLink(self, ifname):
330    try:
331      self.iproute.DeleteLink(ifname)
332    except IOError:
333      # The link was not present.
334      pass
335
336  def tearDown(self):
337    super(XfrmAddDeleteVtiTest, self).tearDown()
338    self._QuietDeleteLink(_TEST_XFRM_IFNAME)
339
340
341class SaInfo(object):
342
343  def __init__(self, spi):
344    self.spi = spi
345    self.seq_num = 1
346
347
348class IpSecBaseInterface(object):
349
350  def __init__(self, iface, netid, underlying_netid, local, remote, version):
351    self.iface = iface
352    self.netid = netid
353    self.underlying_netid = underlying_netid
354    self.local, self.remote = local, remote
355
356    # XFRM interfaces technically do not have a version. This keeps track of
357    # the IP version of the local and remote addresses.
358    self.version = version
359    self.rx = self.tx = 0
360    self.addrs = {}
361
362    self.iproute = iproute.IPRoute()
363    self.xfrm = xfrm.Xfrm()
364
365  def Teardown(self):
366    self.TeardownXfrm()
367    self.TeardownInterface()
368
369  def TeardownInterface(self):
370    self.iproute.DeleteLink(self.iface)
371
372  def SetupXfrm(self, use_null_crypt):
373    rand_spi = random.randint(0, 0x7fffffff)
374    self.in_sa = SaInfo(rand_spi)
375    self.out_sa = SaInfo(rand_spi)
376
377    # Select algorithms:
378    if use_null_crypt:
379      auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
380    else:
381      auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
382
383    self.auth = auth
384    self.crypt = crypt
385
386    self._SetupXfrmByType(auth, crypt)
387
388  def Rekey(self, outer_family, new_out_sa, new_in_sa):
389    """Rekeys the Tunnel Interface
390
391    Creates new SAs and updates the outbound security policy to use new SAs.
392
393    Args:
394      outer_family: AF_INET or AF_INET6
395      new_out_sa: An SaInfo struct representing the new outbound SA's info
396      new_in_sa: An SaInfo struct representing the new inbound SA's info
397    """
398    self._Rekey(outer_family, new_out_sa, new_in_sa)
399
400    # Update Interface object
401    self.out_sa = new_out_sa
402    self.in_sa = new_in_sa
403
404  def TeardownXfrm(self):
405    raise NotImplementedError("Subclasses should implement this")
406
407  def _SetupXfrmByType(self, auth_algo, crypt_algo):
408    raise NotImplementedError("Subclasses should implement this")
409
410  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
411    raise NotImplementedError("Subclasses should implement this")
412
413
414class VtiInterface(IpSecBaseInterface):
415
416  def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
417    super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
418                                       remote, version)
419
420    self.ikey = _TEST_IKEY + netid
421    self.okey = _TEST_OKEY + netid
422
423    self.SetupInterface()
424    self.SetupXfrm(False)
425
426  def SetupInterface(self):
427    return self.iproute.CreateVirtualTunnelInterface(
428        self.iface, self.local, self.remote, self.ikey, self.okey)
429
430  def _SetupXfrmByType(self, auth_algo, crypt_algo):
431    # For the VTI, the selectors are wildcard since packets will only
432    # be selected if they have the appropriate mark, hence the inner
433    # addresses are wildcard.
434    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
435                           self.out_sa.spi, crypt_algo, auth_algo,
436                           xfrm.ExactMatchMark(self.okey),
437                           self.underlying_netid, None, xfrm.MATCH_METHOD_ALL)
438
439    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
440                           self.in_sa.spi, crypt_algo, auth_algo,
441                           xfrm.ExactMatchMark(self.ikey), None, None,
442                           xfrm.MATCH_METHOD_MARK)
443
444  def TeardownXfrm(self):
445    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
446                           self.out_sa.spi, self.okey, None)
447    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
448                           self.in_sa.spi, self.ikey, None)
449
450  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
451    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
452    #       the same, but rekeys are asymmetric, and only update the outbound
453    #       policy.
454    self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi,
455                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
456                        xfrm_base._ALGO_AUTH_NULL, None, None,
457                        xfrm.ExactMatchMark(self.okey), self.underlying_netid)
458
459    self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi,
460                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
461                        xfrm_base._ALGO_AUTH_NULL, None, None,
462                        xfrm.ExactMatchMark(self.ikey), None)
463
464    # Create new policies for IPv4 and IPv6.
465    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
466      # Add SPI-specific output policy to enforce using new outbound SPI
467      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
468      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
469                                    (self.local, self.remote))
470      self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey),
471                                 0)
472
473  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
474    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP,
475                           xfrm.ExactMatchMark(self.ikey))
476    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP,
477                           xfrm.ExactMatchMark(self.okey))
478
479
480class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
481  """Test the creation of an XFRM Interface."""
482
483  def testAddXfrmInterface(self):
484    self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
485                                     _LOOPBACK_IFINDEX)
486    if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
487    net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
488
489    # Validate that the netlink interface matches the ioctl interface.
490    self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
491    self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
492    with self.assertRaises(IOError):
493      self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
494
495
496class XfrmInterface(IpSecBaseInterface):
497
498  def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
499               version, use_null_crypt=False):
500    super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
501                                        remote, version)
502
503    self.ifindex = ifindex
504    self.xfrm_if_id = netid
505
506    self.SetupInterface()
507    self.SetupXfrm(use_null_crypt)
508
509  def SetupInterface(self):
510    """Create an XFRM interface."""
511    return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
512
513  def _SetupXfrmByType(self, auth_algo, crypt_algo):
514    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
515                           self.out_sa.spi, crypt_algo, auth_algo, None,
516                           self.underlying_netid, self.xfrm_if_id,
517                           xfrm.MATCH_METHOD_ALL)
518    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
519                           self.in_sa.spi, crypt_algo, auth_algo, None, None,
520                           self.xfrm_if_id, xfrm.MATCH_METHOD_IFID)
521
522  def TeardownXfrm(self):
523    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
524                           self.out_sa.spi, None, self.xfrm_if_id)
525    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
526                           self.in_sa.spi, None, self.xfrm_if_id)
527
528  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
529    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
530    #       the same, but rekeys are asymmetric, and only update the outbound
531    #       policy.
532    self.xfrm.AddSaInfo(
533        self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
534        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
535        None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id)
536
537    self.xfrm.AddSaInfo(
538        self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
539        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
540        None, None, xfrm_if_id=self.xfrm_if_id)
541
542    # Create new policies for IPv4 and IPv6.
543    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
544      # Add SPI-specific output policy to enforce using new outbound SPI
545      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
546      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
547                                    (self.local, self.remote))
548      self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id)
549
550  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
551    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None,
552                           self.xfrm_if_id)
553    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None,
554                           self.xfrm_if_id)
555
556  def Migrate(self, new_underlying_netid, new_local, new_remote):
557    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
558                            new_remote, new_local, self.in_sa.spi,
559                            self.crypt, self.auth, None, None,
560                            new_underlying_netid, self.xfrm_if_id)
561
562    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
563                            new_local, new_remote, self.out_sa.spi,
564                            self.crypt, self.auth, None, None,
565                            new_underlying_netid, self.xfrm_if_id)
566
567    self.local = new_local
568    self.remote = new_remote
569    self.version = net_test.GetAddressVersion(new_local)
570    self.underlying_netid = new_underlying_netid
571
572
573class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
574
575  @classmethod
576  def setUpClass(cls):
577    xfrm_base.XfrmBaseTest.setUpClass()
578    # Tunnel interfaces use marks extensively, so configure realistic packet
579    # marking rules to make the test representative, make PMTUD work, etc.
580    cls.SetInboundMarks(True)
581    cls.SetMarkReflectSysctls(1)
582
583    # Group by tunnel version to ensure that we test at least one IPv4 and one
584    # IPv6 tunnel
585    cls.tunnelsV4 = {}
586    cls.tunnelsV6 = {}
587
588    for i, underlying_netid in enumerate(cls.tuns):
589      for version in 4, 6:
590        netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
591        iface = "ipsec%s" % netid
592        local = cls.MyAddress(version, underlying_netid)
593        if version == 4:
594          remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
595        else:
596          remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
597
598        ifindex = cls.ifindices[underlying_netid]
599        tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
600                                   local, remote, version)
601        cls._SetInboundMarking(netid, iface, True)
602        cls._SetupTunnelNetwork(tunnel, True)
603
604        if version == 4:
605          cls.tunnelsV4[netid] = tunnel
606        else:
607          cls.tunnelsV6[netid] = tunnel
608
609  @classmethod
610  def tearDownClass(cls):
611    # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
612    cls.SetInboundMarks(False)
613    for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()):
614      cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
615      cls._SetupTunnelNetwork(tunnel, False)
616      tunnel.Teardown()
617    xfrm_base.XfrmBaseTest.tearDownClass()
618
619  def randomTunnel(self, outer_version):
620    version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
621    return random.choice(list(version_dict.values()))
622
623  def setUp(self):
624    multinetwork_base.MultiNetworkBaseTest.setUp(self)
625    self.iproute = iproute.IPRoute()
626    self.xfrm = xfrm.Xfrm()
627
628  def tearDown(self):
629    multinetwork_base.MultiNetworkBaseTest.tearDown(self)
630
631  def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
632    """Exchange two addresses on a given interface.
633
634    Args:
635      ifname: Name of the interface
636      old_addr: An address to be removed from the interface
637      new_addr: An address to be added to an interface
638    """
639    version = 6 if ":" in new_addr else 4
640    ifindex = net_test.GetInterfaceIndex(ifname)
641    self.iproute.AddAddress(new_addr,
642                            net_test.AddressLengthBits(version), ifindex)
643    self.iproute.DelAddress(old_addr,
644                            net_test.AddressLengthBits(version), ifindex)
645
646  @classmethod
647  def _GetLocalAddress(cls, version, netid):
648    if version == 4:
649      return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
650    else:
651      return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
652
653  @classmethod
654  def UidRangeForTunnelNetId(cls, netid):
655    if netid < _TUNNEL_NETID_OFFSET:
656      raise ValueError("Tunnel netid outside tunnel range")
657    netid -= _TUNNEL_NETID_OFFSET
658    return (500 + 50 * netid, 500 + 50 * (netid + 1) - 1)
659
660  @classmethod
661  def _SetupTunnelNetwork(cls, tunnel, is_add):
662    """Setup rules and routes for a tunnel Network.
663
664    Takes an interface and depending on the boolean
665    value of is_add, either adds or removes the rules
666    and routes for a tunnel interface to behave like an
667    Android Network for purposes of testing.
668
669    Args:
670      tunnel: A VtiInterface or XfrmInterface, the tunnel to set up.
671      is_add: Boolean that causes this method to perform setup if True or
672        teardown if False
673    """
674    if is_add:
675      # Disable router solicitations to avoid occasional spurious packets
676      # arriving on the underlying network; there are two possible behaviors
677      # when that occurred: either only the RA packet is read, and when it
678      # is echoed back to the tunnel, it causes the test to fail by not
679      # receiving # the UDP_PAYLOAD; or, two packets may arrive on the
680      # underlying # network which fails the assertion that only one ESP packet
681      # is received.
682      cls.SetSysctl(
683          "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0)
684      net_test.SetInterfaceUp(tunnel.iface)
685
686    for version in [4, 6]:
687      ifindex = net_test.GetInterfaceIndex(tunnel.iface)
688      table = tunnel.netid
689
690      # Set up routing rules.
691      start, end = cls.UidRangeForTunnelNetId(tunnel.netid)
692      cls.iproute.UidRangeRule(version, is_add, start, end, table,
693                                cls.PRIORITY_UID)
694      cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF)
695      cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK,
696                              table, cls.PRIORITY_FWMARK)
697
698      # Configure IP addresses.
699      addr = cls._GetLocalAddress(version, tunnel.netid)
700      prefixlen = net_test.AddressLengthBits(version)
701      tunnel.addrs[version] = addr
702      if is_add:
703        cls.iproute.AddAddress(addr, prefixlen, ifindex)
704        cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
705      else:
706        cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
707        cls.iproute.DelAddress(addr, prefixlen, ifindex)
708
709  def assertReceivedPacket(self, tunnel, sa_info):
710    tunnel.rx += 1
711    self.assertEqual((tunnel.rx, tunnel.tx),
712                      self.iproute.GetRxTxPackets(tunnel.iface))
713    sa_info.seq_num += 1
714
715  def assertSentPacket(self, tunnel, sa_info):
716    tunnel.tx += 1
717    self.assertEqual((tunnel.rx, tunnel.tx),
718                      self.iproute.GetRxTxPackets(tunnel.iface))
719    sa_info.seq_num += 1
720
721  def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
722                        sa_info=None, expect_fail=False):
723    """Test null-crypt input path over an IPsec interface."""
724    if sa_info is None:
725      sa_info = tunnel.in_sa
726    read_sock, local_port = _CreateReceiveSock(inner_version)
727
728    input_pkt = _GetNullAuthCryptTunnelModePkt(
729        inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
730        local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
731    self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
732
733    try:
734      if expect_fail:
735        self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096)
736      else:
737        # Verify that the packet data and src are correct
738        data, src = read_sock.recvfrom(4096)
739        self.assertReceivedPacket(tunnel, sa_info)
740        self.assertEqual(net_test.UDP_PAYLOAD, data)
741        self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
742    finally:
743      read_sock.close()
744
745  def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
746                         remote_inner, sa_info=None):
747    """Test null-crypt output path over an IPsec interface."""
748    if sa_info is None:
749      sa_info = tunnel.out_sa
750    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
751                             _TEST_REMOTE_PORT)
752
753    # Read a tunneled IP packet on the underlying (outbound) network
754    # verifying that it is an ESP packet.
755    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
756                                  sa_info.seq_num, None, tunnel.local,
757                                  tunnel.remote)
758
759    # Get and update the IP headers on the inner payload so that we can do a simple
760    # comparison of byte data. Unfortunately, due to the scapy version this runs on,
761    # we cannot parse past the ESP header to the inner IP header, and thus have to
762    # workaround in this manner
763    if inner_version == 4:
764      ip_hdr_options = {
765        'id': scapy.IP(bytes(pkt.payload)[8:]).id,
766        'flags': scapy.IP(bytes(pkt.payload)[8:]).flags
767      }
768    else:
769      ip_hdr_options = {'fl': scapy.IPv6(bytes(pkt.payload)[8:]).fl}
770
771    expected = _GetNullAuthCryptTunnelModePkt(
772        inner_version, local_inner, tunnel.local, local_port, remote_inner,
773        tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
774        ip_hdr_options)
775
776    # Check outer header manually (Avoids having to overwrite outer header's
777    # id, flags or flow label)
778    self.assertSentPacket(tunnel, sa_info)
779    self.assertEqual(expected.src, pkt.src)
780    self.assertEqual(expected.dst, pkt.dst)
781    self.assertEqual(len(expected), len(pkt))
782
783    # Check everything else
784    self.assertEqual(bytes(expected.payload), bytes(pkt.payload))
785
786  def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
787                             remote_inner):
788    """Test both input and output paths over an encrypted IPsec interface.
789
790    This tests specifically makes sure that the both encryption and decryption
791    work together, as opposed to the _CheckTunnel(Input|Output) where the
792    input and output paths are tested separately, and using null encryption.
793    """
794    src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
795                           _TEST_REMOTE_PORT)
796
797    # Make sure it appeared on the underlying interface
798    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
799                                  tunnel.out_sa.seq_num, None, tunnel.local,
800                                  tunnel.remote)
801
802    # Check that packet is not sent in plaintext
803    self.assertTrue(bytes(net_test.UDP_PAYLOAD) not in bytes(pkt))
804
805    # Check src/dst
806    self.assertEqual(tunnel.local, pkt.src)
807    self.assertEqual(tunnel.remote, pkt.dst)
808
809    # Check that the interface statistics recorded the outbound packet
810    self.assertSentPacket(tunnel, tunnel.out_sa)
811
812    try:
813      # Swap the interface addresses to pretend we are the remote
814      self._SwapInterfaceAddress(
815          tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
816
817      # Swap the packet's IP headers and write it back to the underlying
818      # network.
819      pkt = TunTwister.TwistPacket(pkt)
820      read_sock, local_port = _CreateReceiveSock(inner_version,
821                                                 _TEST_REMOTE_PORT)
822      self.ReceivePacketOn(tunnel.underlying_netid, pkt)
823
824      # Verify that the packet data and src are correct
825      data, src = read_sock.recvfrom(4096)
826      self.assertEqual(net_test.UDP_PAYLOAD, data)
827      self.assertEqual((local_inner, src_port), src[:2])
828
829      # Check that the interface statistics recorded the inbound packet
830      self.assertReceivedPacket(tunnel, tunnel.in_sa)
831
832      read_sock.close()
833    finally:
834      # Swap the interface addresses to pretend we are the remote
835      self._SwapInterfaceAddress(
836          tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
837
838  def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
839                       sa_info=None):
840    """Test ICMP error path over an IPsec interface."""
841    if sa_info is None:
842      sa_info = tunnel.out_sa
843    # Now attempt to provoke an ICMP error.
844    # TODO: deduplicate with multinetwork_test.py.
845    dst_prefix, intermediate = {
846        4: ("172.19.", "172.16.9.12"),
847        6: ("2001:db8::", "2001:db8::1")
848    }[tunnel.version]
849
850    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
851                             _TEST_REMOTE_PORT)
852    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
853                                  sa_info.seq_num, None, tunnel.local,
854                                  tunnel.remote)
855    self.assertSentPacket(tunnel, sa_info)
856
857    myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
858    _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
859                                         pkt)
860    self.ReceivePacketOn(tunnel.underlying_netid, toobig)
861
862    # Check that the packet too big reduced the MTU.
863    routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
864    self.assertEqual(1, len(routes))
865    rtmsg, attributes = routes[0]
866    self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
867    self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
868
869    # Clear PMTU information so that future tests don't have to worry about it.
870    self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
871
872  def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
873                                     remote_inner):
874    """Test combined encryption path with ICMP errors over an IPsec tunnel"""
875    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
876                                remote_inner)
877    self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
878    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
879                                remote_inner)
880
881  def  _RebuildTunnel(self, tunnel, use_null_crypt):
882    # Some tests require that the out_seq_num and in_seq_num are the same
883    # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1
884    #
885    # Until we get better scapy support, the only way we can build an
886    # encrypted packet is to send it out, and read the packet from the wire.
887    # We then generally use this as the "inbound" encrypted packet, injecting
888    # it into the interface for which it is expected on.
889    #
890    # As such, this is required to ensure that encrypted packets (which we
891    # currently have no way to easily modify) are not considered replay
892    # attacks by the inbound SA.  (eg: received 3 packets, seq_num_in = 3,
893    # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay
894    # attack)
895    tunnel.TeardownXfrm()
896    tunnel.SetupXfrm(use_null_crypt)
897
898  def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
899    """Bootstrap method to setup and run tests for the given parameters."""
900    tunnel = self.randomTunnel(outer_version)
901
902    try:
903      self._RebuildTunnel(tunnel, use_null_crypt)
904
905      local_inner = tunnel.addrs[inner_version]
906      remote_inner = _GetRemoteInnerAddress(inner_version)
907
908      for i in range(2):
909        func(tunnel, inner_version, local_inner, remote_inner)
910    finally:
911      if use_null_crypt:
912        tunnel.TeardownXfrm()
913        tunnel.SetupXfrm(False)
914
915  def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner):
916    old_out_sa = tunnel.out_sa
917    old_in_sa = tunnel.in_sa
918
919    # Check to make sure that both directions work before rekey
920    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
921                           old_in_sa)
922    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
923                            old_out_sa)
924
925    # Rekey
926    outer_family = net_test.GetAddressFamily(tunnel.version)
927
928    # Create new SA
929    # Distinguish the new SAs with new SPIs.
930    new_out_sa = SaInfo(old_out_sa.spi + 1)
931    new_in_sa = SaInfo(old_in_sa.spi + 1)
932
933    # Perform Rekey
934    tunnel.Rekey(outer_family, new_out_sa, new_in_sa)
935
936    # Expect that the old SPI still works for inbound packets
937    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
938                           old_in_sa)
939
940    # Test both paths with new SPIs, expect outbound to use new SPI
941    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
942                           new_in_sa)
943    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
944                            new_out_sa)
945
946    # Delete old SAs
947    tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi)
948
949    # Test both paths with new SPIs; should still work
950    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
951                           new_in_sa)
952    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
953                            new_out_sa)
954
955    # Expect failure upon trying to receive a packet with the deleted SPI
956    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
957                           old_in_sa, True)
958
959  def _TestTunnelRekey(self, inner_version, outer_version):
960    """Test packet input and output over a Virtual Tunnel Interface."""
961    tunnel = self.randomTunnel(outer_version)
962
963    try:
964      # Always use null_crypt, so we can check input and output separately
965      tunnel.TeardownXfrm()
966      tunnel.SetupXfrm(True)
967
968      local_inner = tunnel.addrs[inner_version]
969      remote_inner = _GetRemoteInnerAddress(inner_version)
970
971      self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner)
972    finally:
973      tunnel.TeardownXfrm()
974      tunnel.SetupXfrm(False)
975
976
977class XfrmVtiTest(XfrmTunnelBase):
978
979  INTERFACE_CLASS = VtiInterface
980
981  def ParamTestVtiInput(self, inner_version, outer_version):
982    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
983
984  def ParamTestVtiOutput(self, inner_version, outer_version):
985    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
986                     True)
987
988  def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
989    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
990                     False)
991
992  def ParamTestVtiIcmp(self, inner_version, outer_version):
993    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
994
995  def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
996    self._TestTunnel(inner_version, outer_version,
997                     self._CheckTunnelEncryptionWithIcmp, False)
998
999  def ParamTestVtiRekey(self, inner_version, outer_version):
1000    self._TestTunnelRekey(inner_version, outer_version)
1001
1002
1003class XfrmInterfaceTest(XfrmTunnelBase):
1004
1005  INTERFACE_CLASS = XfrmInterface
1006
1007  def ParamTestXfrmIntfInput(self, inner_version, outer_version):
1008    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
1009
1010  def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
1011    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
1012                     True)
1013
1014  def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
1015    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
1016                     False)
1017
1018  def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
1019    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
1020
1021  def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
1022    self._TestTunnel(inner_version, outer_version,
1023                     self._CheckTunnelEncryptionWithIcmp, False)
1024
1025  def ParamTestXfrmIntfRekey(self, inner_version, outer_version):
1026    self._TestTunnelRekey(inner_version, outer_version)
1027
1028##############################################################################
1029#
1030# Test for presence of CONFIG_XFRM_MIGRATE and kernel patches
1031#
1032#   xfrm: Check if_id in xfrm_migrate
1033#   Upstream commit: c1aca3080e382886e2e58e809787441984a2f89b
1034#
1035#   xfrm: Fix xfrm migrate issues when address family changes
1036#   Upstream commit: e03c3bba351f99ad932e8f06baa9da1afc418e02
1037#
1038# Those two upstream 5.17 fixes above were pulled in to LTS in kernel versions
1039# 4.14.273, 4.19.236, 5.4.186, 5.10.107, 5.15.30.
1040#
1041# Note: the 'Check if_id in xfrm_migrate' fix did not land in 4.14 LTS,
1042# and instead landed in android-4.14-stable after 4.14.320 LTS merge.
1043#
1044@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE,
1045                     "XFRM migration unsupported or fixes not included")
1046class XfrmInterfaceMigrateTest(XfrmTunnelBase):
1047  INTERFACE_CLASS = XfrmInterface
1048
1049  def setUpTunnel(self, outer_version, use_null_crypt):
1050    underlying_netid = self.RandomNetid()
1051    netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET
1052    iface = "ipsec%s" % netid
1053    ifindex = self.ifindices[underlying_netid]
1054
1055    local = self.MyAddress(outer_version, underlying_netid)
1056    remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR
1057
1058    tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex,
1059                           local, remote, outer_version, use_null_crypt)
1060    self._SetInboundMarking(netid, iface, True)
1061    self._SetupTunnelNetwork(tunnel, True)
1062
1063    return tunnel
1064
1065  def tearDownTunnel(self, tunnel):
1066    self._SetInboundMarking(tunnel.netid, tunnel.iface, False)
1067    self._SetupTunnelNetwork(tunnel, False)
1068    tunnel.Teardown()
1069
1070  def _TestTunnel(self, inner_version, outer_version, new_outer_version, func,
1071                  use_null_crypt):
1072    tunnel = self.randomTunnel(outer_version)
1073
1074    old_underlying_netid = tunnel.underlying_netid
1075    old_local = tunnel.local
1076    old_remote = tunnel.remote
1077
1078
1079    try:
1080      self._RebuildTunnel(tunnel, use_null_crypt)
1081
1082      # Verify functionality before migration
1083      local_inner = tunnel.addrs[inner_version]
1084      remote_inner = _GetRemoteInnerAddress(inner_version)
1085      func(tunnel, inner_version, local_inner, remote_inner)
1086
1087      # Migrate tunnel
1088      new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid)
1089      new_version = new_outer_version
1090      new_local = self.MyAddress(new_version, new_underlying_netid)
1091      new_remote = net_test.IPV4_ADDR2 if new_version == 4 else net_test.IPV6_ADDR2
1092
1093      tunnel.Migrate(new_underlying_netid, new_local, new_remote)
1094
1095      # Verify functionality after migration
1096      func(tunnel, inner_version, local_inner, remote_inner)
1097    finally:
1098      # Reset the tunnel to the original configuration
1099      tunnel.TeardownXfrm()
1100
1101      self.local = old_local
1102      self.remote = old_remote
1103      self.underlying_netid = old_underlying_netid
1104      tunnel.SetupXfrm(False)
1105
1106
1107  def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version,
1108                                    new_outer_version):
1109    self._TestTunnel(inner_version, outer_version, new_outer_version,
1110                     self._CheckTunnelInput, True)
1111
1112  def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version,
1113                                     new_outer_version):
1114    self._TestTunnel(inner_version, outer_version, new_outer_version,
1115                     self._CheckTunnelOutput, True)
1116
1117  def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version,
1118                                             new_outer_version):
1119    self._TestTunnel(inner_version, outer_version, new_outer_version,
1120                     self._CheckTunnelEncryption, False)
1121
1122  def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version,
1123                                   new_outer_version):
1124    self._TestTunnel(inner_version, outer_version, new_outer_version,
1125                     self._CheckTunnelIcmp, False)
1126
1127  def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version,
1128                                                 new_outer_version):
1129    self._TestTunnel(inner_version, outer_version, new_outer_version,
1130                     self._CheckTunnelEncryptionWithIcmp, False)
1131
1132  def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version,
1133                                    new_outer_version):
1134    self._TestTunnel(inner_version, outer_version, new_outer_version,
1135                     self._CheckTunnelRekey, True)
1136
1137if __name__ == "__main__":
1138  InjectTests()
1139  unittest.main()
1140