1#!/usr/bin/python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from socket import * # pylint: disable=wildcard-import 18from scapy import all as scapy 19import binascii 20import struct 21 22import csocket 23import cstruct 24import multinetwork_base 25import net_test 26import util 27import xfrm 28 29_ENCRYPTION_KEY_256 = binascii.unhexlify("308146eb3bd84b044573d60f5a5fd159" 30 "57c7d4fe567a2120f35bae0f9869ec22") 31_AUTHENTICATION_KEY_128 = binascii.unhexlify("af442892cdcd0ef650e9c299f9a8436a") 32 33_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth((b"digest_null", 0, 0)), b"") 34_ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)), 35 _AUTHENTICATION_KEY_128) 36 37_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo((b"ecb(cipher_null)", 0)), b"") 38_ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), 39 _ENCRYPTION_KEY_256) 40 41# Match all bits of the mark 42MARK_MASK_ALL = 0xffffffff 43 44 45def SetPolicySockopt(sock, family, opt_data): 46 optlen = len(opt_data) if opt_data is not None else 0 47 if family == AF_INET: 48 csocket.Setsockopt(sock, IPPROTO_IP, xfrm.IP_XFRM_POLICY, opt_data, optlen) 49 else: 50 csocket.Setsockopt(sock, IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, opt_data, 51 optlen) 52 53 54def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs): 55 """Create and apply an ESP policy to a socket. 56 57 A socket may have only one policy per direction, so applying a policy will 58 remove any policy that was previously applied in that direction. 59 60 Args: 61 sock: The socket that needs a policy 62 family: AF_INET or AF_INET6 63 direction: XFRM_POLICY_IN or XFRM_POLICY_OUT 64 spi: 32-bit SPI in host byte order 65 reqid: 32-bit ID matched against SAs 66 tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None 67 to request a transport mode SA. 68 """ 69 # Create a selector that matches all packets of the specified address family. 70 selector = xfrm.EmptySelector(family) 71 72 # Create an XFRM policy and template. 73 policy = xfrm.UserPolicy(direction, selector) 74 template = xfrm.UserTemplate(family, spi, reqid, tun_addrs) 75 76 # Set the policy and template on our socket. 77 opt_data = policy.Pack() + template.Pack() 78 79 # The policy family might not match the socket family. For example, we might 80 # have an IPv4 policy on a dual-stack socket. 81 sockfamily = sock.getsockopt(SOL_SOCKET, net_test.SO_DOMAIN) 82 SetPolicySockopt(sock, sockfamily, opt_data) 83 84def _GetCryptParameters(crypt_alg): 85 """Looks up encryption algorithm's block and IV lengths. 86 87 Args: 88 crypt_alg: the encryption algorithm constant 89 Returns: 90 A tuple of the block size, and IV length 91 """ 92 if crypt_alg == _ALGO_CRYPT_NULL: 93 return (4, 0) 94 if crypt_alg == _ALGO_CBC_AES_256: 95 return (16, 16) 96 return (0, 0) 97 98 99def GetEspPacketLength(mode, version, udp_encap, payload, 100 auth_alg, crypt_alg): 101 """Calculates encrypted length of a UDP packet with the given payload. 102 103 Args: 104 mode: XFRM_MODE_TRANSPORT or XFRM_MODE_TUNNEL. 105 version: IPPROTO_IP for IPv4, IPPROTO_IPV6 for IPv6. The inner header. 106 udp_encap: whether UDP encap overhead should be accounted for. Since the 107 outermost IP header is ignored (payload only), only add for udp 108 encap'd packets. 109 payload: UDP payload bytes. 110 auth_alg: The xfrm_base authentication algorithm used in the SA. 111 crypt_alg: The xfrm_base encryption algorithm used in the SA. 112 113 Return: the packet length. 114 """ 115 116 crypt_iv_len, crypt_blk_size=_GetCryptParameters(crypt_alg) 117 auth_trunc_len = auth_alg[0].trunc_len 118 119 # Wrap in UDP payload 120 payload_len = len(payload) + net_test.UDP_HDR_LEN 121 122 # Size constants 123 esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number 124 icv_len = auth_trunc_len // 8 125 126 # Add inner IP header if tunnel mode 127 if mode == xfrm.XFRM_MODE_TUNNEL: 128 payload_len += net_test.GetIpHdrLength(version) 129 130 # Add ESP trailer 131 payload_len += 2 # Pad Length + Next Header fields 132 133 # Align to block size of encryption algorithm 134 payload_len += util.GetPadLength(crypt_blk_size, payload_len) 135 136 # Add initialization vector, header length and ICV length 137 payload_len += esp_hdr_len + crypt_iv_len + icv_len 138 139 # Add encap as needed 140 if udp_encap: 141 payload_len += net_test.UDP_HDR_LEN 142 143 return payload_len 144 145 146def GetEspTrailer(length, nexthdr): 147 # ESP padding per RFC 4303 section 2.4. 148 # For a null cipher with a block size of 1, padding is only necessary to 149 # ensure that the 1-byte Pad Length and Next Header fields are right aligned 150 # on a 4-byte boundary. 151 esplen = length + 2 # Packet length plus Pad Length and Next Header. 152 padlen = util.GetPadLength(4, esplen) 153 # The pad bytes are consecutive integers starting from 0x01. 154 padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8") 155 return padding + struct.pack("BB", padlen, nexthdr) 156 157 158def EncryptPacketWithNull(packet, spi, seq, tun_addrs): 159 """Apply null encryption to a packet. 160 161 This performs ESP encapsulation on the given packet. The returned packet will 162 be a tunnel mode packet if tun_addrs is provided. 163 164 The input packet is assumed to be a UDP packet. The input packet *MUST* have 165 its length and checksum fields in IP and UDP headers set appropriately. This 166 can be done by "rebuilding" the scapy object. e.g., 167 ip6_packet = scapy.IPv6(bytes(ip6_packet)) 168 169 TODO: Support TCP 170 171 Args: 172 packet: a scapy.IPv6 or scapy.IP packet 173 spi: security parameter index for ESP header in host byte order 174 seq: sequence number for ESP header 175 tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None 176 to request a transport mode packet. 177 178 Return: 179 The encrypted packet (scapy.IPv6 or scapy.IP) 180 """ 181 # The top-level packet changes in tunnel mode, which would invalidate 182 # the passed-in packet pointer. For consistency, this function now returns 183 # a new packet and does not modify the user's original packet. 184 packet = packet.copy() 185 udp_layer = packet.getlayer(scapy.UDP) 186 if not udp_layer: 187 raise ValueError("Expected a UDP packet") 188 # Build an ESP header. 189 esp_packet = scapy.Raw(xfrm.EspHdr((spi, seq)).Pack()) 190 191 if tun_addrs: 192 tsrc_addr, tdst_addr = tun_addrs 193 outer_version = net_test.GetAddressVersion(tsrc_addr) 194 ip_type = {4: scapy.IP, 6: scapy.IPv6}[outer_version] 195 new_ip_layer = ip_type(src=tsrc_addr, dst=tdst_addr) 196 inner_layer = packet 197 esp_nexthdr = {scapy.IPv6: IPPROTO_IPV6, 198 scapy.IP: IPPROTO_IPIP}[type(packet)] 199 else: 200 new_ip_layer = None 201 inner_layer = udp_layer 202 esp_nexthdr = IPPROTO_UDP 203 204 trailer = GetEspTrailer(len(inner_layer), esp_nexthdr) 205 206 # Assemble the packet. 207 esp_packet.payload = scapy.Raw(inner_layer) 208 packet = new_ip_layer if new_ip_layer else packet 209 packet.payload = scapy.Raw(bytes(esp_packet) + trailer) 210 211 # TODO: Can we simplify this and avoid the initial copy()? 212 # Fix the IPv4/IPv6 headers. 213 if type(packet) is scapy.IPv6: 214 packet.nh = IPPROTO_ESP 215 # Recompute plen. 216 packet.plen = None 217 packet = scapy.IPv6(bytes(packet)) 218 elif type(packet) is scapy.IP: 219 packet.proto = IPPROTO_ESP 220 # Recompute IPv4 len and checksum. 221 packet.len = None 222 packet.chksum = None 223 packet = scapy.IP(bytes(packet)) 224 else: 225 raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) 226 return packet 227 228 229def DecryptPacketWithNull(packet): 230 """Apply null decryption to a packet. 231 232 This performs ESP decapsulation on the given packet. The input packet is 233 assumed to be a UDP packet. This function will remove the ESP header and 234 trailer bytes from an ESP packet. 235 236 TODO: Support TCP 237 238 Args: 239 packet: a scapy.IPv6 or scapy.IP packet 240 241 Returns: 242 A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr 243 """ 244 esp_hdr, esp_data = cstruct.Read(bytes(packet.payload), xfrm.EspHdr) 245 # Parse and strip ESP trailer. 246 pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:]) 247 trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields. 248 LayerType = { 249 IPPROTO_IPIP: scapy.IP, 250 IPPROTO_IPV6: scapy.IPv6, 251 IPPROTO_UDP: scapy.UDP}[esp_nexthdr] 252 next_layer = LayerType(esp_data[:-trailer_len]) 253 if esp_nexthdr in [IPPROTO_IPIP, IPPROTO_IPV6]: 254 # Tunnel mode decap is simple. Return the inner packet. 255 return next_layer, esp_hdr 256 257 # Cut out the ESP header. 258 packet.payload = next_layer 259 # Fix the IPv4/IPv6 headers. 260 if type(packet) is scapy.IPv6: 261 packet.nh = IPPROTO_UDP 262 packet.plen = None # Recompute packet length. 263 packet = scapy.IPv6(bytes(packet)) 264 elif type(packet) is scapy.IP: 265 packet.proto = IPPROTO_UDP 266 packet.len = None # Recompute packet length. 267 packet.chksum = None # Recompute IPv4 checksum. 268 packet = scapy.IP(bytes(packet)) 269 else: 270 raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) 271 return packet, esp_hdr 272 273 274class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): 275 """Base test class for all XFRM-related testing.""" 276 277 def _isIcmpv6(self, payload): 278 if not isinstance(payload, scapy.IPv6): 279 return False 280 if payload.nh == IPPROTO_ICMPV6: 281 return True 282 return payload.nh == IPPROTO_HOPOPTS and payload.payload.nh == IPPROTO_ICMPV6 283 284 def _ExpectEspPacketOn(self, netid, spi, seq, length, src_addr, dst_addr): 285 """Read a packet from a netid and verify its properties. 286 287 Args: 288 netid: netid from which to read an ESP packet 289 spi: SPI of the ESP packet in host byte order 290 seq: sequence number of the ESP packet 291 length: length of the packet's ESP payload or None to skip this check 292 src_addr: source address of the packet or None to skip this check 293 dst_addr: destination address of the packet or None to skip this check 294 295 Returns: 296 scapy.IP/IPv6: the read packet 297 """ 298 packets = [] 299 for packet in self.ReadAllPacketsOn(netid): 300 if not self._isIcmpv6(packet): 301 packets.append(packet) 302 303 self.assertEqual(1, len(packets)) 304 packet = packets[0] 305 if length is not None: 306 self.assertEqual(length, len(packet.payload)) 307 if dst_addr is not None: 308 self.assertEqual(dst_addr, packet.dst) 309 if src_addr is not None: 310 self.assertEqual(src_addr, packet.src) 311 # extract the ESP header 312 esp_hdr, _ = cstruct.Read(bytes(packet.payload), xfrm.EspHdr) 313 self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) 314 return packet 315 316 317# TODO: delete this when we're more diligent about deleting our SAs. 318class XfrmLazyTest(XfrmBaseTest): 319 """Base test class Xfrm tests that cleans XFRM state on teardown.""" 320 def setUp(self): 321 super(XfrmBaseTest, self).setUp() 322 self.xfrm = xfrm.Xfrm() 323 self.xfrm.FlushSaInfo() 324 self.xfrm.FlushPolicyInfo() 325 326 def tearDown(self): 327 super(XfrmBaseTest, self).tearDown() 328 self.xfrm.FlushSaInfo() 329 self.xfrm.FlushPolicyInfo() 330 self.xfrm.close() 331