1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17import logging 18import serial 19from . import tag 20from binascii import hexlify 21from serial.tools.list_ports import comports 22from mobly import logger as mobly_logger 23 24GET_FIRMWARE_VERSION = 0x02 25SAM_CONFIGURATION = 0x14 26RF_CONFIGURATION = 0x32 27IN_DATA_EXCHANGE = 0x40 28IN_COMMUNICATE_THRU = 0x42 29IN_LIST_PASSIVE_TARGET = 0x4A 30WRITE_REGISTER = 0x08 31LONG_PREAMBLE = bytearray(20) 32 33 34def crc16a(data): 35 w_crc = 0x6363 36 for byte in data: 37 byte = byte ^ (w_crc & 0x00FF) 38 byte = (byte ^ (byte << 4)) & 0xFF 39 w_crc = ((w_crc >> 8) ^ (byte << 8) ^ (byte << 3) ^ (byte >> 4)) & 0xFFFF 40 return bytes([w_crc & 0xFF, (w_crc >> 8) & 0xFF]) 41 42 43def with_crc16a(data): 44 return bytes(data) + crc16a(data) 45 46 47class PN532: 48 49 def __init__(self, path): 50 """Initializes device on path, or first available serial port if none is provided.""" 51 if len(comports()) == 0: 52 raise IndexError( 53 "Could not find device on serial port, make sure reader is plugged in." 54 ) 55 if len(path) == 0: 56 path = comports()[0].device 57 self.log = mobly_logger.PrefixLoggerAdapter( 58 logging.getLogger(), 59 { 60 mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: ( 61 f"[PN532|{path}]" 62 ) 63 }, 64 ) 65 self.log.debug("Serial port: %s", path) 66 self.device = serial.Serial(path, 115200, timeout=0.1) 67 68 self.device.flush() 69 self.device.write(LONG_PREAMBLE + bytearray.fromhex("0000ff00ff00")) 70 self.device.flushInput() 71 if not self.verify_firmware_version(): 72 raise RuntimeError("Could not verify PN532 firmware on serial path " + path) 73 rsp = self.send_frame( 74 LONG_PREAMBLE + self.construct_frame([SAM_CONFIGURATION, 0x01, 0x00]), 75 1, 76 ) 77 if not rsp: 78 raise RuntimeError("No response for SAM configuration.") 79 80 # Disable retries 81 self.device.flushInput() 82 rsp = self.send_frame( 83 self.construct_frame( 84 [ 85 RF_CONFIGURATION, 86 0x05, 87 0x00, # MxRtyATR 88 0x00, # MxRtyPSL 89 0x00, # MxRtyPassiveActivation 90 ] 91 ), 92 1, 93 ) 94 if not rsp: 95 raise RuntimeError("No response for RF configuration.") 96 97 def verify_firmware_version(self): 98 """Verifies we are talking to a PN532.""" 99 self.log.debug("Checking firmware version") 100 rsp = self.send_frame( 101 LONG_PREAMBLE + self.construct_frame([GET_FIRMWARE_VERSION]) 102 ) 103 104 if not rsp: 105 raise RuntimeError("No response for GetFirmwareVersion") 106 107 if rsp[0] != GET_FIRMWARE_VERSION + 1 or len(rsp) != 5: 108 self.log.error("Got unexpected response for GetFirmwareVersion") 109 return False 110 111 return rsp[1] == 0x32 112 113 def poll_a(self): 114 """Attempts to detect target for NFC type A.""" 115 self.log.debug("Polling A") 116 rsp = self.send_frame( 117 self.construct_frame([IN_LIST_PASSIVE_TARGET, 0x01, 0x00]) 118 ) 119 if not rsp: 120 raise RuntimeError("No response for send poll_a frame.") 121 122 if rsp[0] != IN_LIST_PASSIVE_TARGET + 1: 123 self.log.error("Got unexpected command code in response") 124 del rsp[0] 125 126 num_targets = rsp[0] 127 if num_targets == 0: 128 return None 129 del rsp[0] 130 131 target_id = rsp[0] 132 del rsp[0] 133 134 sense_res = rsp[0:2] 135 del rsp[0:2] 136 137 sel_res = rsp[0] 138 self.log.debug("Got tag, SEL_RES is %02x", sel_res) 139 del rsp[0] 140 141 nfcid_len = rsp[0] 142 del rsp[0] 143 nfcid = rsp[0:nfcid_len] 144 del rsp[0:nfcid_len] 145 146 ats_len = rsp[0] 147 del rsp[0] 148 ats = rsp[0 : ats_len - 1] 149 del rsp[0 : ats_len - 1] 150 151 return tag.TypeATag(self, target_id, sense_res, sel_res, nfcid, ats) 152 153 def poll_b(self): 154 """Attempts to detect target for NFC type B.""" 155 self.log.debug("Polling B") 156 rsp = self.send_frame( 157 self.construct_frame([IN_LIST_PASSIVE_TARGET, 0x01, 0x03, 0x00]) 158 ) 159 if not rsp: 160 raise RuntimeError("No response for send poll_b frame.") 161 162 def send_broadcast(self, broadcast): 163 """Emits broadcast frame with CRC. This should be called after poll_a().""" 164 self.log.debug("Sending broadcast %s", hexlify(broadcast).decode()) 165 166 # Adjust bit framing so all bytes are transmitted 167 self.send_frame(self.construct_frame([WRITE_REGISTER, 0x63, 0x3D, 0x00])) 168 rsp = self.send_frame( 169 self.construct_frame([IN_COMMUNICATE_THRU] + list(with_crc16a(broadcast))) 170 ) 171 if not rsp: 172 raise RuntimeError("No response for send broadcast.") 173 174 def transceive(self, data): 175 """Sends data to device and returns response.""" 176 self.log.debug("Transceive") 177 rsp = self.send_frame(self.construct_frame([IN_DATA_EXCHANGE] + list(data)), 5) 178 179 if not rsp: 180 return None 181 182 if rsp[0] != IN_DATA_EXCHANGE + 1: 183 self.log.error("Got unexpected command code in response") 184 del rsp[0] 185 186 if rsp[0] != 0: 187 self.log.error("Got error exchanging data") 188 return None 189 del rsp[0] 190 191 return rsp 192 193 def mute(self): 194 """Turns off device's RF antenna.""" 195 self.log.debug("Muting") 196 self.send_frame(self.construct_frame([RF_CONFIGURATION, 0x01, 0x02])) 197 198 def construct_frame(self, data): 199 """Construct a data fram to be sent to the PN532.""" 200 # Preamble, start code, length, length checksum, TFI 201 frame = [ 202 0x00, 203 0x00, 204 0xFF, 205 len(data) + 1, 206 (~(len(data) + 1) & 0xFF) + 0x01, 207 0xD4, 208 ] 209 data_sum = 0xD4 210 211 # Add data to frame 212 for b in data: 213 data_sum += b 214 frame.append(b) 215 frame.append((~data_sum & 0xFF) + 0x01) # Data checksum 216 frame.append(0x00) # Postamble 217 218 self.log.debug("Constructed frame " + hexlify(bytearray(frame)).decode()) 219 220 return bytearray(frame) 221 222 def send_frame(self, frame, timeout=0.1): 223 """ 224 Writes a frame to the device and returns the response. 225 """ 226 self.device.write(frame) 227 return self.get_device_response(timeout) 228 229 def get_device_response(self, timeout=0.5): 230 """ 231 Confirms we get an ACK frame from device, reads response frame, and writes ACK. 232 """ 233 self.device.timeout = timeout 234 frame = bytearray(self.device.read(6)) 235 236 if (len(frame)) == 0: 237 self.log.error("Did not get response from PN532") 238 return None 239 240 if hexlify(frame).decode() != "0000ff00ff00": 241 self.log.error("Did not get ACK frame, got %s", hexlify(frame).decode()) 242 243 frame = bytearray(self.device.read(6)) 244 245 if (len(frame)) == 0: 246 return None 247 248 if hexlify(frame[0:3]).decode() != "0000ff": 249 self.log.error( 250 "Unexpected start to frame, got %s", hexlify(frame[0:3]).decode() 251 ) 252 253 data_len = frame[3] 254 length_checksum = frame[4] 255 if (length_checksum + data_len) & 0xFF != 0: 256 self.log.error("Frame failed length checksum") 257 return None 258 259 tfi = frame[5] 260 if tfi != 0xD5: 261 self.log.error( 262 "Unexpected TFI byte when performing read, got %02x", frame[5] 263 ) 264 return None 265 266 data_packet = bytearray( 267 self.device.read(data_len - 1) 268 ) # subtract one since length includes TFI byte. 269 data_checksum = bytearray(self.device.read(1))[0] 270 if (tfi + sum(data_packet) + data_checksum) & 0xFF != 0: 271 self.log.error("Frame failed data checksum") 272 273 postamble = bytearray(self.device.read(1))[0] 274 if postamble != 0x00: 275 if tfi != 0xD5: 276 self.log.error( 277 "Unexpected postamble byte when performing read, got %02x", frame[4] 278 ) 279 280 self.device.timeout = 0.1 281 self.device.write( 282 bytearray.fromhex("0000ff00ff00") 283 ) # send ACK frame, there is no response. 284 285 self.log.debug( 286 "Received frame %s%s", 287 hexlify(frame).decode(), 288 hexlify(data_packet).decode(), 289 ) 290 291 return data_packet 292