1#!/usr/bin/python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18from errno import * # pylint: disable=wildcard-import 19from socket import * # pylint: disable=wildcard-import 20 21import random 22import itertools 23import struct 24import unittest 25 26from net_test import LINUX_VERSION 27from scapy import all as scapy 28from tun_twister import TunTwister 29import csocket 30import iproute 31import multinetwork_base 32import net_test 33import packets 34import util 35import xfrm 36import xfrm_base 37 38_LOOPBACK_IFINDEX = 1 39_TEST_XFRM_IFNAME = "ipsec42" 40_TEST_XFRM_IF_ID = 42 41_TEST_SPI = 0x1234 42 43# Two kernel fixes have been added in 5.17 to allow XFRM_MIGRATE to work correctly 44# when (1) there are multiple tunnels with the same selectors; and (2) addresses 45# are updated to a different IP family. These two fixes were pulled into upstream 46# LTS releases 4.14.273, 4.19.236, 5.4.186, 5.10.107 and 5.15.30, from whence they 47# flowed into the Android Common Kernel (via standard LTS merges). 48# 49# Note 'xfrm: Check if_id in xfrm_migrate' did not end up in 4.14 LTS, 50# and is only present in ACK android-4.14-stable after 4.14.320 LTS merge. 51# See https://android-review.git.corp.google.com/c/kernel/common/+/2640243 52# 53# As such we require 4.14.321+, 4.19.236+, 5.4.186+, 5.10.107+, 5.15.30+ or 5.17+ 54# to have these fixes. 55def HasXfrmMigrateFixes(): 56 return net_test.KernelAtLeast([(4, 19, 236), (5, 4, 186), 57 (5, 10, 107), (5, 15, 30)]) or net_test.NonGXI(4, 14) 58 59 60# Does the kernel support CONFIG_XFRM_MIGRATE and include the kernel fixes? 61def SupportsXfrmMigrate(): 62 if not HasXfrmMigrateFixes(): 63 return False 64 65 # 5.10+ must have CONFIG_XFRM_MIGRATE enabled 66 if LINUX_VERSION >= (5, 10, 0): 67 return True 68 69 try: 70 x = xfrm.Xfrm() 71 wildcard_addr = net_test.GetWildcardAddress(6) 72 selector = xfrm.EmptySelector(AF_INET6) 73 74 # Expect migration to fail with EINVAL because it is trying to migrate a 75 # non-existent SA. 76 x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr, 77 wildcard_addr, wildcard_addr, _TEST_SPI, 78 None, None, None, None, None, None) 79 print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled") 80 return True 81 except IOError as err: 82 if err.errno == ENOPROTOOPT: 83 return False 84 elif err.errno == EINVAL: 85 return True 86 else: 87 print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno) 88 return True 89 90SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate() 91 92# Parameters to setup tunnels as special networks 93_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService 94_BASE_TUNNEL_NETID = {4: 40, 6: 60} 95_BASE_VTI_OKEY = 2000000100 96_BASE_VTI_IKEY = 2000000200 97 98_TEST_OUT_SPI = _TEST_SPI 99_TEST_IN_SPI = _TEST_OUT_SPI 100 101_TEST_OKEY = 2000000100 102_TEST_IKEY = 2000000200 103 104_TEST_REMOTE_PORT = 1234 105 106_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} 107 108 109def _GetLocalInnerAddress(version): 110 return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] 111 112 113def _GetRemoteInnerAddress(version): 114 return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version] 115 116 117def _GetRemoteOuterAddress(version): 118 return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] 119 120 121def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, 122 src_port, dst_inner, dst_outer, 123 dst_port, spi, seq_num, ip_hdr_options=None): 124 if ip_hdr_options is None: 125 ip_hdr_options = {} 126 127 ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) 128 129 # Build and receive an ESP packet destined for the inner socket 130 IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] 131 input_pkt = ( 132 IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / 133 net_test.UDP_PAYLOAD) 134 input_pkt = IpType(bytes(input_pkt)) # Compute length, checksum. 135 input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, 136 (src_outer, dst_outer)) 137 138 return input_pkt 139 140 141def _CreateReceiveSock(version, port=0): 142 # Create a socket to receive packets. 143 read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 144 read_sock.bind((net_test.GetWildcardAddress(version), port)) 145 # The second parameter of the tuple is the port number regardless of AF. 146 local_port = read_sock.getsockname()[1] 147 # Guard against the eventuality of the receive failing. 148 csocket.SetSocketTimeout(read_sock, 500) 149 150 return read_sock, local_port 151 152 153def _SendPacket(testInstance, netid, version, remote, remote_port): 154 # Send a packet out via the tunnel-backed network, bound for the port number 155 # of the input socket. 156 write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 157 testInstance.SelectInterface(write_sock, netid, "mark") 158 write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) 159 local_port = write_sock.getsockname()[1] 160 write_sock.close() 161 162 return local_port 163 164 165def InjectTests(): 166 InjectParameterizedTests(XfrmTunnelTest) 167 InjectParameterizedTests(XfrmInterfaceTest) 168 InjectParameterizedTests(XfrmVtiTest) 169 InjectParameterizedMigrateTests(XfrmInterfaceMigrateTest) 170 171 172def InjectParameterizedTests(cls): 173 VERSIONS = (4, 6) 174 param_list = itertools.product(VERSIONS, VERSIONS) 175 176 def NameGenerator(*args): 177 return "IPv%d_in_IPv%d" % tuple(args) 178 179 util.InjectParameterizedTest(cls, param_list, NameGenerator) 180 181def InjectParameterizedMigrateTests(cls): 182 VERSIONS = (4, 6) 183 param_list = itertools.product(VERSIONS, VERSIONS, VERSIONS) 184 185 def NameGenerator(*args): 186 return "IPv%d_in_IPv%d_to_outer_IPv%d" % tuple(args) 187 188 util.InjectParameterizedTest(cls, param_list, NameGenerator) 189 190 191class XfrmTunnelTest(xfrm_base.XfrmLazyTest): 192 193 def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, 194 netid, local_inner, remote_inner, local_outer, 195 remote_outer, write_sock): 196 197 write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) 198 self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, 199 local_outer, remote_outer) 200 201 def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, 202 netid, local_inner, remote_inner, local_outer, 203 remote_outer, read_sock): 204 205 # The second parameter of the tuple is the port number regardless of AF. 206 local_port = read_sock.getsockname()[1] 207 208 # Build and receive an ESP packet destined for the inner socket 209 input_pkt = _GetNullAuthCryptTunnelModePkt( 210 inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, 211 local_inner, local_outer, local_port, _TEST_IN_SPI, 1) 212 self.ReceivePacketOn(underlying_netid, input_pkt) 213 214 # Verify that the packet data and src are correct 215 data, src = read_sock.recvfrom(4096) 216 self.assertEqual(net_test.UDP_PAYLOAD, data) 217 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 218 219 def _TestTunnel(self, inner_version, outer_version, func, direction, 220 test_output_mark_unset): 221 """Test a unidirectional XFRM Tunnel with explicit selectors""" 222 # Select the underlying netid, which represents the external 223 # interface from/to which to route ESP packets. 224 u_netid = self.RandomNetid() 225 # Select a random netid that will originate traffic locally and 226 # which represents the netid on which the plaintext is sent 227 netid = self.RandomNetid(exclude=u_netid) 228 229 local_inner = self.MyAddress(inner_version, netid) 230 remote_inner = _GetRemoteInnerAddress(inner_version) 231 local_outer = self.MyAddress(outer_version, u_netid) 232 remote_outer = _GetRemoteOuterAddress(outer_version) 233 234 output_mark = u_netid 235 if test_output_mark_unset: 236 output_mark = None 237 self.SetDefaultNetwork(u_netid) 238 239 try: 240 # Create input/ouput SPs, SAs and sockets to simulate a more realistic 241 # environment. 242 self.xfrm.CreateTunnel( 243 xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner), 244 remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL, 245 xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL) 246 247 self.xfrm.CreateTunnel( 248 xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), 249 local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, 250 xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL) 251 252 write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 253 self.SelectInterface(write_sock, netid, "mark") 254 read_sock, _ = _CreateReceiveSock(inner_version) 255 256 sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock 257 func(inner_version, outer_version, u_netid, netid, local_inner, 258 remote_inner, local_outer, remote_outer, sock) 259 260 write_sock.close() 261 read_sock.close() 262 finally: 263 if test_output_mark_unset: 264 self.ClearDefaultNetwork() 265 266 def ParamTestTunnelInput(self, inner_version, outer_version): 267 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, 268 xfrm.XFRM_POLICY_IN, False) 269 270 def ParamTestTunnelOutput(self, inner_version, outer_version): 271 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 272 xfrm.XFRM_POLICY_OUT, False) 273 274 def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version): 275 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 276 xfrm.XFRM_POLICY_OUT, True) 277 278 279class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): 280 def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, 281 ikey, okey): 282 self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey) 283 self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey) 284 285 family = AF_INET if version == 4 else AF_INET6 286 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), 287 local_addr) 288 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), 289 remote_addr) 290 291 def testAddVti(self): 292 """Test the creation of a Virtual Tunnel Interface.""" 293 for version in [4, 6]: 294 netid = self.RandomNetid() 295 local_addr = self.MyAddress(version, netid) 296 self.iproute.CreateVirtualTunnelInterface( 297 dev_name=_TEST_XFRM_IFNAME, 298 local_addr=local_addr, 299 remote_addr=_GetRemoteOuterAddress(version), 300 o_key=_TEST_OKEY, 301 i_key=_TEST_IKEY) 302 self._VerifyVtiInfoData( 303 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 304 _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) 305 306 new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} 307 new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID 308 new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID 309 self.iproute.CreateVirtualTunnelInterface( 310 dev_name=_TEST_XFRM_IFNAME, 311 local_addr=local_addr, 312 remote_addr=new_remote_addr[version], 313 o_key=new_okey, 314 i_key=new_ikey, 315 is_update=True) 316 317 self._VerifyVtiInfoData( 318 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 319 new_remote_addr[version], new_ikey, new_okey) 320 321 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 322 323 # Validate that the netlink interface matches the ioctl interface. 324 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 325 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 326 with self.assertRaises(IOError): 327 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 328 329 def _QuietDeleteLink(self, ifname): 330 try: 331 self.iproute.DeleteLink(ifname) 332 except IOError: 333 # The link was not present. 334 pass 335 336 def tearDown(self): 337 super(XfrmAddDeleteVtiTest, self).tearDown() 338 self._QuietDeleteLink(_TEST_XFRM_IFNAME) 339 340 341class SaInfo(object): 342 343 def __init__(self, spi): 344 self.spi = spi 345 self.seq_num = 1 346 347 348class IpSecBaseInterface(object): 349 350 def __init__(self, iface, netid, underlying_netid, local, remote, version): 351 self.iface = iface 352 self.netid = netid 353 self.underlying_netid = underlying_netid 354 self.local, self.remote = local, remote 355 356 # XFRM interfaces technically do not have a version. This keeps track of 357 # the IP version of the local and remote addresses. 358 self.version = version 359 self.rx = self.tx = 0 360 self.addrs = {} 361 362 self.iproute = iproute.IPRoute() 363 self.xfrm = xfrm.Xfrm() 364 365 def Teardown(self): 366 self.TeardownXfrm() 367 self.TeardownInterface() 368 369 def TeardownInterface(self): 370 self.iproute.DeleteLink(self.iface) 371 372 def SetupXfrm(self, use_null_crypt): 373 rand_spi = random.randint(0, 0x7fffffff) 374 self.in_sa = SaInfo(rand_spi) 375 self.out_sa = SaInfo(rand_spi) 376 377 # Select algorithms: 378 if use_null_crypt: 379 auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL 380 else: 381 auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 382 383 self.auth = auth 384 self.crypt = crypt 385 386 self._SetupXfrmByType(auth, crypt) 387 388 def Rekey(self, outer_family, new_out_sa, new_in_sa): 389 """Rekeys the Tunnel Interface 390 391 Creates new SAs and updates the outbound security policy to use new SAs. 392 393 Args: 394 outer_family: AF_INET or AF_INET6 395 new_out_sa: An SaInfo struct representing the new outbound SA's info 396 new_in_sa: An SaInfo struct representing the new inbound SA's info 397 """ 398 self._Rekey(outer_family, new_out_sa, new_in_sa) 399 400 # Update Interface object 401 self.out_sa = new_out_sa 402 self.in_sa = new_in_sa 403 404 def TeardownXfrm(self): 405 raise NotImplementedError("Subclasses should implement this") 406 407 def _SetupXfrmByType(self, auth_algo, crypt_algo): 408 raise NotImplementedError("Subclasses should implement this") 409 410 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 411 raise NotImplementedError("Subclasses should implement this") 412 413 414class VtiInterface(IpSecBaseInterface): 415 416 def __init__(self, iface, netid, underlying_netid, _, local, remote, version): 417 super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, 418 remote, version) 419 420 self.ikey = _TEST_IKEY + netid 421 self.okey = _TEST_OKEY + netid 422 423 self.SetupInterface() 424 self.SetupXfrm(False) 425 426 def SetupInterface(self): 427 return self.iproute.CreateVirtualTunnelInterface( 428 self.iface, self.local, self.remote, self.ikey, self.okey) 429 430 def _SetupXfrmByType(self, auth_algo, crypt_algo): 431 # For the VTI, the selectors are wildcard since packets will only 432 # be selected if they have the appropriate mark, hence the inner 433 # addresses are wildcard. 434 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 435 self.out_sa.spi, crypt_algo, auth_algo, 436 xfrm.ExactMatchMark(self.okey), 437 self.underlying_netid, None, xfrm.MATCH_METHOD_ALL) 438 439 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 440 self.in_sa.spi, crypt_algo, auth_algo, 441 xfrm.ExactMatchMark(self.ikey), None, None, 442 xfrm.MATCH_METHOD_MARK) 443 444 def TeardownXfrm(self): 445 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 446 self.out_sa.spi, self.okey, None) 447 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 448 self.in_sa.spi, self.ikey, None) 449 450 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 451 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 452 # the same, but rekeys are asymmetric, and only update the outbound 453 # policy. 454 self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi, 455 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 456 xfrm_base._ALGO_AUTH_NULL, None, None, 457 xfrm.ExactMatchMark(self.okey), self.underlying_netid) 458 459 self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi, 460 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 461 xfrm_base._ALGO_AUTH_NULL, None, None, 462 xfrm.ExactMatchMark(self.ikey), None) 463 464 # Create new policies for IPv4 and IPv6. 465 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 466 # Add SPI-specific output policy to enforce using new outbound SPI 467 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 468 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 469 (self.local, self.remote)) 470 self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey), 471 0) 472 473 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 474 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, 475 xfrm.ExactMatchMark(self.ikey)) 476 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, 477 xfrm.ExactMatchMark(self.okey)) 478 479 480class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): 481 """Test the creation of an XFRM Interface.""" 482 483 def testAddXfrmInterface(self): 484 self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, 485 _LOOPBACK_IFINDEX) 486 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 487 net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) 488 489 # Validate that the netlink interface matches the ioctl interface. 490 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 491 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 492 with self.assertRaises(IOError): 493 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 494 495 496class XfrmInterface(IpSecBaseInterface): 497 498 def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, 499 version, use_null_crypt=False): 500 super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, 501 remote, version) 502 503 self.ifindex = ifindex 504 self.xfrm_if_id = netid 505 506 self.SetupInterface() 507 self.SetupXfrm(use_null_crypt) 508 509 def SetupInterface(self): 510 """Create an XFRM interface.""" 511 return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) 512 513 def _SetupXfrmByType(self, auth_algo, crypt_algo): 514 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 515 self.out_sa.spi, crypt_algo, auth_algo, None, 516 self.underlying_netid, self.xfrm_if_id, 517 xfrm.MATCH_METHOD_ALL) 518 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 519 self.in_sa.spi, crypt_algo, auth_algo, None, None, 520 self.xfrm_if_id, xfrm.MATCH_METHOD_IFID) 521 522 def TeardownXfrm(self): 523 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 524 self.out_sa.spi, None, self.xfrm_if_id) 525 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 526 self.in_sa.spi, None, self.xfrm_if_id) 527 528 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 529 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 530 # the same, but rekeys are asymmetric, and only update the outbound 531 # policy. 532 self.xfrm.AddSaInfo( 533 self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 534 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 535 None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id) 536 537 self.xfrm.AddSaInfo( 538 self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 539 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 540 None, None, xfrm_if_id=self.xfrm_if_id) 541 542 # Create new policies for IPv4 and IPv6. 543 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 544 # Add SPI-specific output policy to enforce using new outbound SPI 545 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 546 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 547 (self.local, self.remote)) 548 self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id) 549 550 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 551 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None, 552 self.xfrm_if_id) 553 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None, 554 self.xfrm_if_id) 555 556 def Migrate(self, new_underlying_netid, new_local, new_remote): 557 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 558 new_remote, new_local, self.in_sa.spi, 559 self.crypt, self.auth, None, None, 560 new_underlying_netid, self.xfrm_if_id) 561 562 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 563 new_local, new_remote, self.out_sa.spi, 564 self.crypt, self.auth, None, None, 565 new_underlying_netid, self.xfrm_if_id) 566 567 self.local = new_local 568 self.remote = new_remote 569 self.version = net_test.GetAddressVersion(new_local) 570 self.underlying_netid = new_underlying_netid 571 572 573class XfrmTunnelBase(xfrm_base.XfrmBaseTest): 574 575 @classmethod 576 def setUpClass(cls): 577 xfrm_base.XfrmBaseTest.setUpClass() 578 # Tunnel interfaces use marks extensively, so configure realistic packet 579 # marking rules to make the test representative, make PMTUD work, etc. 580 cls.SetInboundMarks(True) 581 cls.SetMarkReflectSysctls(1) 582 583 # Group by tunnel version to ensure that we test at least one IPv4 and one 584 # IPv6 tunnel 585 cls.tunnelsV4 = {} 586 cls.tunnelsV6 = {} 587 588 for i, underlying_netid in enumerate(cls.tuns): 589 for version in 4, 6: 590 netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i 591 iface = "ipsec%s" % netid 592 local = cls.MyAddress(version, underlying_netid) 593 if version == 4: 594 remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) 595 else: 596 remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) 597 598 ifindex = cls.ifindices[underlying_netid] 599 tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, 600 local, remote, version) 601 cls._SetInboundMarking(netid, iface, True) 602 cls._SetupTunnelNetwork(tunnel, True) 603 604 if version == 4: 605 cls.tunnelsV4[netid] = tunnel 606 else: 607 cls.tunnelsV6[netid] = tunnel 608 609 @classmethod 610 def tearDownClass(cls): 611 # The sysctls are restored by MultinetworkBaseTest.tearDownClass. 612 cls.SetInboundMarks(False) 613 for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()): 614 cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) 615 cls._SetupTunnelNetwork(tunnel, False) 616 tunnel.Teardown() 617 xfrm_base.XfrmBaseTest.tearDownClass() 618 619 def randomTunnel(self, outer_version): 620 version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 621 return random.choice(list(version_dict.values())) 622 623 def setUp(self): 624 multinetwork_base.MultiNetworkBaseTest.setUp(self) 625 self.iproute = iproute.IPRoute() 626 self.xfrm = xfrm.Xfrm() 627 628 def tearDown(self): 629 multinetwork_base.MultiNetworkBaseTest.tearDown(self) 630 631 def _SwapInterfaceAddress(self, ifname, old_addr, new_addr): 632 """Exchange two addresses on a given interface. 633 634 Args: 635 ifname: Name of the interface 636 old_addr: An address to be removed from the interface 637 new_addr: An address to be added to an interface 638 """ 639 version = 6 if ":" in new_addr else 4 640 ifindex = net_test.GetInterfaceIndex(ifname) 641 self.iproute.AddAddress(new_addr, 642 net_test.AddressLengthBits(version), ifindex) 643 self.iproute.DelAddress(old_addr, 644 net_test.AddressLengthBits(version), ifindex) 645 646 @classmethod 647 def _GetLocalAddress(cls, version, netid): 648 if version == 4: 649 return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) 650 else: 651 return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" 652 653 @classmethod 654 def UidRangeForTunnelNetId(cls, netid): 655 if netid < _TUNNEL_NETID_OFFSET: 656 raise ValueError("Tunnel netid outside tunnel range") 657 netid -= _TUNNEL_NETID_OFFSET 658 return (500 + 50 * netid, 500 + 50 * (netid + 1) - 1) 659 660 @classmethod 661 def _SetupTunnelNetwork(cls, tunnel, is_add): 662 """Setup rules and routes for a tunnel Network. 663 664 Takes an interface and depending on the boolean 665 value of is_add, either adds or removes the rules 666 and routes for a tunnel interface to behave like an 667 Android Network for purposes of testing. 668 669 Args: 670 tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. 671 is_add: Boolean that causes this method to perform setup if True or 672 teardown if False 673 """ 674 if is_add: 675 # Disable router solicitations to avoid occasional spurious packets 676 # arriving on the underlying network; there are two possible behaviors 677 # when that occurred: either only the RA packet is read, and when it 678 # is echoed back to the tunnel, it causes the test to fail by not 679 # receiving # the UDP_PAYLOAD; or, two packets may arrive on the 680 # underlying # network which fails the assertion that only one ESP packet 681 # is received. 682 cls.SetSysctl( 683 "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) 684 net_test.SetInterfaceUp(tunnel.iface) 685 686 for version in [4, 6]: 687 ifindex = net_test.GetInterfaceIndex(tunnel.iface) 688 table = tunnel.netid 689 690 # Set up routing rules. 691 start, end = cls.UidRangeForTunnelNetId(tunnel.netid) 692 cls.iproute.UidRangeRule(version, is_add, start, end, table, 693 cls.PRIORITY_UID) 694 cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) 695 cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, 696 table, cls.PRIORITY_FWMARK) 697 698 # Configure IP addresses. 699 addr = cls._GetLocalAddress(version, tunnel.netid) 700 prefixlen = net_test.AddressLengthBits(version) 701 tunnel.addrs[version] = addr 702 if is_add: 703 cls.iproute.AddAddress(addr, prefixlen, ifindex) 704 cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) 705 else: 706 cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) 707 cls.iproute.DelAddress(addr, prefixlen, ifindex) 708 709 def assertReceivedPacket(self, tunnel, sa_info): 710 tunnel.rx += 1 711 self.assertEqual((tunnel.rx, tunnel.tx), 712 self.iproute.GetRxTxPackets(tunnel.iface)) 713 sa_info.seq_num += 1 714 715 def assertSentPacket(self, tunnel, sa_info): 716 tunnel.tx += 1 717 self.assertEqual((tunnel.rx, tunnel.tx), 718 self.iproute.GetRxTxPackets(tunnel.iface)) 719 sa_info.seq_num += 1 720 721 def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, 722 sa_info=None, expect_fail=False): 723 """Test null-crypt input path over an IPsec interface.""" 724 if sa_info is None: 725 sa_info = tunnel.in_sa 726 read_sock, local_port = _CreateReceiveSock(inner_version) 727 728 input_pkt = _GetNullAuthCryptTunnelModePkt( 729 inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, 730 local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) 731 self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) 732 733 try: 734 if expect_fail: 735 self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096) 736 else: 737 # Verify that the packet data and src are correct 738 data, src = read_sock.recvfrom(4096) 739 self.assertReceivedPacket(tunnel, sa_info) 740 self.assertEqual(net_test.UDP_PAYLOAD, data) 741 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 742 finally: 743 read_sock.close() 744 745 def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, 746 remote_inner, sa_info=None): 747 """Test null-crypt output path over an IPsec interface.""" 748 if sa_info is None: 749 sa_info = tunnel.out_sa 750 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 751 _TEST_REMOTE_PORT) 752 753 # Read a tunneled IP packet on the underlying (outbound) network 754 # verifying that it is an ESP packet. 755 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 756 sa_info.seq_num, None, tunnel.local, 757 tunnel.remote) 758 759 # Get and update the IP headers on the inner payload so that we can do a simple 760 # comparison of byte data. Unfortunately, due to the scapy version this runs on, 761 # we cannot parse past the ESP header to the inner IP header, and thus have to 762 # workaround in this manner 763 if inner_version == 4: 764 ip_hdr_options = { 765 'id': scapy.IP(bytes(pkt.payload)[8:]).id, 766 'flags': scapy.IP(bytes(pkt.payload)[8:]).flags 767 } 768 else: 769 ip_hdr_options = {'fl': scapy.IPv6(bytes(pkt.payload)[8:]).fl} 770 771 expected = _GetNullAuthCryptTunnelModePkt( 772 inner_version, local_inner, tunnel.local, local_port, remote_inner, 773 tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, 774 ip_hdr_options) 775 776 # Check outer header manually (Avoids having to overwrite outer header's 777 # id, flags or flow label) 778 self.assertSentPacket(tunnel, sa_info) 779 self.assertEqual(expected.src, pkt.src) 780 self.assertEqual(expected.dst, pkt.dst) 781 self.assertEqual(len(expected), len(pkt)) 782 783 # Check everything else 784 self.assertEqual(bytes(expected.payload), bytes(pkt.payload)) 785 786 def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, 787 remote_inner): 788 """Test both input and output paths over an encrypted IPsec interface. 789 790 This tests specifically makes sure that the both encryption and decryption 791 work together, as opposed to the _CheckTunnel(Input|Output) where the 792 input and output paths are tested separately, and using null encryption. 793 """ 794 src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 795 _TEST_REMOTE_PORT) 796 797 # Make sure it appeared on the underlying interface 798 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, 799 tunnel.out_sa.seq_num, None, tunnel.local, 800 tunnel.remote) 801 802 # Check that packet is not sent in plaintext 803 self.assertTrue(bytes(net_test.UDP_PAYLOAD) not in bytes(pkt)) 804 805 # Check src/dst 806 self.assertEqual(tunnel.local, pkt.src) 807 self.assertEqual(tunnel.remote, pkt.dst) 808 809 # Check that the interface statistics recorded the outbound packet 810 self.assertSentPacket(tunnel, tunnel.out_sa) 811 812 try: 813 # Swap the interface addresses to pretend we are the remote 814 self._SwapInterfaceAddress( 815 tunnel.iface, new_addr=remote_inner, old_addr=local_inner) 816 817 # Swap the packet's IP headers and write it back to the underlying 818 # network. 819 pkt = TunTwister.TwistPacket(pkt) 820 read_sock, local_port = _CreateReceiveSock(inner_version, 821 _TEST_REMOTE_PORT) 822 self.ReceivePacketOn(tunnel.underlying_netid, pkt) 823 824 # Verify that the packet data and src are correct 825 data, src = read_sock.recvfrom(4096) 826 self.assertEqual(net_test.UDP_PAYLOAD, data) 827 self.assertEqual((local_inner, src_port), src[:2]) 828 829 # Check that the interface statistics recorded the inbound packet 830 self.assertReceivedPacket(tunnel, tunnel.in_sa) 831 832 read_sock.close() 833 finally: 834 # Swap the interface addresses to pretend we are the remote 835 self._SwapInterfaceAddress( 836 tunnel.iface, new_addr=local_inner, old_addr=remote_inner) 837 838 def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, 839 sa_info=None): 840 """Test ICMP error path over an IPsec interface.""" 841 if sa_info is None: 842 sa_info = tunnel.out_sa 843 # Now attempt to provoke an ICMP error. 844 # TODO: deduplicate with multinetwork_test.py. 845 dst_prefix, intermediate = { 846 4: ("172.19.", "172.16.9.12"), 847 6: ("2001:db8::", "2001:db8::1") 848 }[tunnel.version] 849 850 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 851 _TEST_REMOTE_PORT) 852 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 853 sa_info.seq_num, None, tunnel.local, 854 tunnel.remote) 855 self.assertSentPacket(tunnel, sa_info) 856 857 myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) 858 _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, 859 pkt) 860 self.ReceivePacketOn(tunnel.underlying_netid, toobig) 861 862 # Check that the packet too big reduced the MTU. 863 routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) 864 self.assertEqual(1, len(routes)) 865 rtmsg, attributes = routes[0] 866 self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) 867 self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) 868 869 # Clear PMTU information so that future tests don't have to worry about it. 870 self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) 871 872 def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, 873 remote_inner): 874 """Test combined encryption path with ICMP errors over an IPsec tunnel""" 875 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 876 remote_inner) 877 self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) 878 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 879 remote_inner) 880 881 def _RebuildTunnel(self, tunnel, use_null_crypt): 882 # Some tests require that the out_seq_num and in_seq_num are the same 883 # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1 884 # 885 # Until we get better scapy support, the only way we can build an 886 # encrypted packet is to send it out, and read the packet from the wire. 887 # We then generally use this as the "inbound" encrypted packet, injecting 888 # it into the interface for which it is expected on. 889 # 890 # As such, this is required to ensure that encrypted packets (which we 891 # currently have no way to easily modify) are not considered replay 892 # attacks by the inbound SA. (eg: received 3 packets, seq_num_in = 3, 893 # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay 894 # attack) 895 tunnel.TeardownXfrm() 896 tunnel.SetupXfrm(use_null_crypt) 897 898 def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): 899 """Bootstrap method to setup and run tests for the given parameters.""" 900 tunnel = self.randomTunnel(outer_version) 901 902 try: 903 self._RebuildTunnel(tunnel, use_null_crypt) 904 905 local_inner = tunnel.addrs[inner_version] 906 remote_inner = _GetRemoteInnerAddress(inner_version) 907 908 for i in range(2): 909 func(tunnel, inner_version, local_inner, remote_inner) 910 finally: 911 if use_null_crypt: 912 tunnel.TeardownXfrm() 913 tunnel.SetupXfrm(False) 914 915 def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner): 916 old_out_sa = tunnel.out_sa 917 old_in_sa = tunnel.in_sa 918 919 # Check to make sure that both directions work before rekey 920 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 921 old_in_sa) 922 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 923 old_out_sa) 924 925 # Rekey 926 outer_family = net_test.GetAddressFamily(tunnel.version) 927 928 # Create new SA 929 # Distinguish the new SAs with new SPIs. 930 new_out_sa = SaInfo(old_out_sa.spi + 1) 931 new_in_sa = SaInfo(old_in_sa.spi + 1) 932 933 # Perform Rekey 934 tunnel.Rekey(outer_family, new_out_sa, new_in_sa) 935 936 # Expect that the old SPI still works for inbound packets 937 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 938 old_in_sa) 939 940 # Test both paths with new SPIs, expect outbound to use new SPI 941 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 942 new_in_sa) 943 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 944 new_out_sa) 945 946 # Delete old SAs 947 tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi) 948 949 # Test both paths with new SPIs; should still work 950 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 951 new_in_sa) 952 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 953 new_out_sa) 954 955 # Expect failure upon trying to receive a packet with the deleted SPI 956 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 957 old_in_sa, True) 958 959 def _TestTunnelRekey(self, inner_version, outer_version): 960 """Test packet input and output over a Virtual Tunnel Interface.""" 961 tunnel = self.randomTunnel(outer_version) 962 963 try: 964 # Always use null_crypt, so we can check input and output separately 965 tunnel.TeardownXfrm() 966 tunnel.SetupXfrm(True) 967 968 local_inner = tunnel.addrs[inner_version] 969 remote_inner = _GetRemoteInnerAddress(inner_version) 970 971 self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner) 972 finally: 973 tunnel.TeardownXfrm() 974 tunnel.SetupXfrm(False) 975 976 977class XfrmVtiTest(XfrmTunnelBase): 978 979 INTERFACE_CLASS = VtiInterface 980 981 def ParamTestVtiInput(self, inner_version, outer_version): 982 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 983 984 def ParamTestVtiOutput(self, inner_version, outer_version): 985 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 986 True) 987 988 def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): 989 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 990 False) 991 992 def ParamTestVtiIcmp(self, inner_version, outer_version): 993 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 994 995 def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): 996 self._TestTunnel(inner_version, outer_version, 997 self._CheckTunnelEncryptionWithIcmp, False) 998 999 def ParamTestVtiRekey(self, inner_version, outer_version): 1000 self._TestTunnelRekey(inner_version, outer_version) 1001 1002 1003class XfrmInterfaceTest(XfrmTunnelBase): 1004 1005 INTERFACE_CLASS = XfrmInterface 1006 1007 def ParamTestXfrmIntfInput(self, inner_version, outer_version): 1008 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 1009 1010 def ParamTestXfrmIntfOutput(self, inner_version, outer_version): 1011 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 1012 True) 1013 1014 def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): 1015 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 1016 False) 1017 1018 def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): 1019 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 1020 1021 def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): 1022 self._TestTunnel(inner_version, outer_version, 1023 self._CheckTunnelEncryptionWithIcmp, False) 1024 1025 def ParamTestXfrmIntfRekey(self, inner_version, outer_version): 1026 self._TestTunnelRekey(inner_version, outer_version) 1027 1028############################################################################## 1029# 1030# Test for presence of CONFIG_XFRM_MIGRATE and kernel patches 1031# 1032# xfrm: Check if_id in xfrm_migrate 1033# Upstream commit: c1aca3080e382886e2e58e809787441984a2f89b 1034# 1035# xfrm: Fix xfrm migrate issues when address family changes 1036# Upstream commit: e03c3bba351f99ad932e8f06baa9da1afc418e02 1037# 1038# Those two upstream 5.17 fixes above were pulled in to LTS in kernel versions 1039# 4.14.273, 4.19.236, 5.4.186, 5.10.107, 5.15.30. 1040# 1041# Note: the 'Check if_id in xfrm_migrate' fix did not land in 4.14 LTS, 1042# and instead landed in android-4.14-stable after 4.14.320 LTS merge. 1043# 1044@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, 1045 "XFRM migration unsupported or fixes not included") 1046class XfrmInterfaceMigrateTest(XfrmTunnelBase): 1047 INTERFACE_CLASS = XfrmInterface 1048 1049 def setUpTunnel(self, outer_version, use_null_crypt): 1050 underlying_netid = self.RandomNetid() 1051 netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET 1052 iface = "ipsec%s" % netid 1053 ifindex = self.ifindices[underlying_netid] 1054 1055 local = self.MyAddress(outer_version, underlying_netid) 1056 remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR 1057 1058 tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex, 1059 local, remote, outer_version, use_null_crypt) 1060 self._SetInboundMarking(netid, iface, True) 1061 self._SetupTunnelNetwork(tunnel, True) 1062 1063 return tunnel 1064 1065 def tearDownTunnel(self, tunnel): 1066 self._SetInboundMarking(tunnel.netid, tunnel.iface, False) 1067 self._SetupTunnelNetwork(tunnel, False) 1068 tunnel.Teardown() 1069 1070 def _TestTunnel(self, inner_version, outer_version, new_outer_version, func, 1071 use_null_crypt): 1072 tunnel = self.randomTunnel(outer_version) 1073 1074 old_underlying_netid = tunnel.underlying_netid 1075 old_local = tunnel.local 1076 old_remote = tunnel.remote 1077 1078 1079 try: 1080 self._RebuildTunnel(tunnel, use_null_crypt) 1081 1082 # Verify functionality before migration 1083 local_inner = tunnel.addrs[inner_version] 1084 remote_inner = _GetRemoteInnerAddress(inner_version) 1085 func(tunnel, inner_version, local_inner, remote_inner) 1086 1087 # Migrate tunnel 1088 new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid) 1089 new_version = new_outer_version 1090 new_local = self.MyAddress(new_version, new_underlying_netid) 1091 new_remote = net_test.IPV4_ADDR2 if new_version == 4 else net_test.IPV6_ADDR2 1092 1093 tunnel.Migrate(new_underlying_netid, new_local, new_remote) 1094 1095 # Verify functionality after migration 1096 func(tunnel, inner_version, local_inner, remote_inner) 1097 finally: 1098 # Reset the tunnel to the original configuration 1099 tunnel.TeardownXfrm() 1100 1101 self.local = old_local 1102 self.remote = old_remote 1103 self.underlying_netid = old_underlying_netid 1104 tunnel.SetupXfrm(False) 1105 1106 1107 def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version, 1108 new_outer_version): 1109 self._TestTunnel(inner_version, outer_version, new_outer_version, 1110 self._CheckTunnelInput, True) 1111 1112 def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version, 1113 new_outer_version): 1114 self._TestTunnel(inner_version, outer_version, new_outer_version, 1115 self._CheckTunnelOutput, True) 1116 1117 def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version, 1118 new_outer_version): 1119 self._TestTunnel(inner_version, outer_version, new_outer_version, 1120 self._CheckTunnelEncryption, False) 1121 1122 def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version, 1123 new_outer_version): 1124 self._TestTunnel(inner_version, outer_version, new_outer_version, 1125 self._CheckTunnelIcmp, False) 1126 1127 def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version, 1128 new_outer_version): 1129 self._TestTunnel(inner_version, outer_version, new_outer_version, 1130 self._CheckTunnelEncryptionWithIcmp, False) 1131 1132 def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version, 1133 new_outer_version): 1134 self._TestTunnel(inner_version, outer_version, new_outer_version, 1135 self._CheckTunnelRekey, True) 1136 1137if __name__ == "__main__": 1138 InjectTests() 1139 unittest.main() 1140