1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17import logging
18import serial
19from . import tag
20from binascii import hexlify
21from serial.tools.list_ports import comports
22from mobly import logger as mobly_logger
23
24GET_FIRMWARE_VERSION = 0x02
25SAM_CONFIGURATION = 0x14
26RF_CONFIGURATION = 0x32
27IN_DATA_EXCHANGE = 0x40
28IN_COMMUNICATE_THRU = 0x42
29IN_LIST_PASSIVE_TARGET = 0x4A
30WRITE_REGISTER = 0x08
31LONG_PREAMBLE = bytearray(20)
32
33
34def crc16a(data):
35    w_crc = 0x6363
36    for byte in data:
37        byte = byte ^ (w_crc & 0x00FF)
38        byte = (byte ^ (byte << 4)) & 0xFF
39        w_crc = ((w_crc >> 8) ^ (byte << 8) ^ (byte << 3) ^ (byte >> 4)) & 0xFFFF
40    return bytes([w_crc & 0xFF, (w_crc >> 8) & 0xFF])
41
42
43def with_crc16a(data):
44    return bytes(data) + crc16a(data)
45
46
47class PN532:
48
49    def __init__(self, path):
50        """Initializes device on path, or first available serial port if none is provided."""
51        if len(comports()) == 0:
52            raise IndexError(
53                "Could not find device on serial port, make sure reader is plugged in."
54            )
55        if len(path) == 0:
56            path = comports()[0].device
57        self.log = mobly_logger.PrefixLoggerAdapter(
58            logging.getLogger(),
59            {
60                mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: (
61                    f"[PN532|{path}]"
62                )
63            },
64        )
65        self.log.debug("Serial port: %s", path)
66        self.device = serial.Serial(path, 115200, timeout=0.1)
67
68        self.device.flush()
69        self.device.write(LONG_PREAMBLE + bytearray.fromhex("0000ff00ff00"))
70        self.device.flushInput()
71        if not self.verify_firmware_version():
72            raise RuntimeError("Could not verify PN532 firmware on serial path " + path)
73        rsp = self.send_frame(
74            LONG_PREAMBLE + self.construct_frame([SAM_CONFIGURATION, 0x01, 0x00]),
75            1,
76            )
77        if not rsp:
78            raise RuntimeError("No response for SAM configuration.")
79
80        # Disable retries
81        self.device.flushInput()
82        rsp = self.send_frame(
83            self.construct_frame(
84                [
85                    RF_CONFIGURATION,
86                    0x05,
87                    0x00,  # MxRtyATR
88                    0x00,  # MxRtyPSL
89                    0x00,  # MxRtyPassiveActivation
90                ]
91            ),
92            1,
93        )
94        if not rsp:
95            raise RuntimeError("No response for RF configuration.")
96
97    def verify_firmware_version(self):
98        """Verifies we are talking to a PN532."""
99        self.log.debug("Checking firmware version")
100        rsp = self.send_frame(
101            LONG_PREAMBLE + self.construct_frame([GET_FIRMWARE_VERSION])
102        )
103
104        if not rsp:
105            raise RuntimeError("No response for GetFirmwareVersion")
106
107        if rsp[0] != GET_FIRMWARE_VERSION + 1 or len(rsp) != 5:
108            self.log.error("Got unexpected response for GetFirmwareVersion")
109            return False
110
111        return rsp[1] == 0x32
112
113    def poll_a(self):
114        """Attempts to detect target for NFC type A."""
115        self.log.debug("Polling A")
116        rsp = self.send_frame(
117            self.construct_frame([IN_LIST_PASSIVE_TARGET, 0x01, 0x00])
118        )
119        if not rsp:
120            raise RuntimeError("No response for send poll_a frame.")
121
122        if rsp[0] != IN_LIST_PASSIVE_TARGET + 1:
123            self.log.error("Got unexpected command code in response")
124        del rsp[0]
125
126        num_targets = rsp[0]
127        if num_targets == 0:
128            return None
129        del rsp[0]
130
131        target_id = rsp[0]
132        del rsp[0]
133
134        sense_res = rsp[0:2]
135        del rsp[0:2]
136
137        sel_res = rsp[0]
138        self.log.debug("Got tag, SEL_RES is %02x", sel_res)
139        del rsp[0]
140
141        nfcid_len = rsp[0]
142        del rsp[0]
143        nfcid = rsp[0:nfcid_len]
144        del rsp[0:nfcid_len]
145
146        ats_len = rsp[0]
147        del rsp[0]
148        ats = rsp[0 : ats_len - 1]
149        del rsp[0 : ats_len - 1]
150
151        return tag.TypeATag(self, target_id, sense_res, sel_res, nfcid, ats)
152
153    def poll_b(self):
154        """Attempts to detect target for NFC type B."""
155        self.log.debug("Polling B")
156        rsp = self.send_frame(
157            self.construct_frame([IN_LIST_PASSIVE_TARGET, 0x01, 0x03, 0x00])
158        )
159        if not rsp:
160            raise RuntimeError("No response for send poll_b frame.")
161
162    def send_broadcast(self, broadcast):
163        """Emits broadcast frame with CRC. This should be called after poll_a()."""
164        self.log.debug("Sending broadcast %s", hexlify(broadcast).decode())
165
166        # Adjust bit framing so all bytes are transmitted
167        self.send_frame(self.construct_frame([WRITE_REGISTER, 0x63, 0x3D, 0x00]))
168        rsp = self.send_frame(
169            self.construct_frame([IN_COMMUNICATE_THRU] + list(with_crc16a(broadcast)))
170        )
171        if not rsp:
172            raise RuntimeError("No response for send broadcast.")
173
174    def transceive(self, data):
175        """Sends data to device and returns response."""
176        self.log.debug("Transceive")
177        rsp = self.send_frame(self.construct_frame([IN_DATA_EXCHANGE] + list(data)), 5)
178
179        if not rsp:
180            return None
181
182        if rsp[0] != IN_DATA_EXCHANGE + 1:
183            self.log.error("Got unexpected command code in response")
184        del rsp[0]
185
186        if rsp[0] != 0:
187            self.log.error("Got error exchanging data")
188            return None
189        del rsp[0]
190
191        return rsp
192
193    def mute(self):
194        """Turns off device's RF antenna."""
195        self.log.debug("Muting")
196        self.send_frame(self.construct_frame([RF_CONFIGURATION, 0x01, 0x02]))
197
198    def construct_frame(self, data):
199        """Construct a data fram to be sent to the PN532."""
200        # Preamble, start code, length, length checksum, TFI
201        frame = [
202            0x00,
203            0x00,
204            0xFF,
205            len(data) + 1,
206            (~(len(data) + 1) & 0xFF) + 0x01,
207            0xD4,
208            ]
209        data_sum = 0xD4
210
211        # Add data to frame
212        for b in data:
213            data_sum += b
214            frame.append(b)
215        frame.append((~data_sum & 0xFF) + 0x01)  # Data checksum
216        frame.append(0x00)  # Postamble
217
218        self.log.debug("Constructed frame " + hexlify(bytearray(frame)).decode())
219
220        return bytearray(frame)
221
222    def send_frame(self, frame, timeout=0.1):
223        """
224        Writes a frame to the device and returns the response.
225        """
226        self.device.write(frame)
227        return self.get_device_response(timeout)
228
229    def get_device_response(self, timeout=0.5):
230        """
231        Confirms we get an ACK frame from device, reads response frame, and writes ACK.
232        """
233        self.device.timeout = timeout
234        frame = bytearray(self.device.read(6))
235
236        if (len(frame)) == 0:
237            self.log.error("Did not get response from PN532")
238            return None
239
240        if hexlify(frame).decode() != "0000ff00ff00":
241            self.log.error("Did not get ACK frame, got %s", hexlify(frame).decode())
242
243        frame = bytearray(self.device.read(6))
244
245        if (len(frame)) == 0:
246            return None
247
248        if hexlify(frame[0:3]).decode() != "0000ff":
249            self.log.error(
250                "Unexpected start to frame, got %s", hexlify(frame[0:3]).decode()
251            )
252
253        data_len = frame[3]
254        length_checksum = frame[4]
255        if (length_checksum + data_len) & 0xFF != 0:
256            self.log.error("Frame failed length checksum")
257            return None
258
259        tfi = frame[5]
260        if tfi != 0xD5:
261            self.log.error(
262                "Unexpected TFI byte when performing read, got %02x", frame[5]
263            )
264            return None
265
266        data_packet = bytearray(
267            self.device.read(data_len - 1)
268        )  # subtract one since length includes TFI byte.
269        data_checksum = bytearray(self.device.read(1))[0]
270        if (tfi + sum(data_packet) + data_checksum) & 0xFF != 0:
271            self.log.error("Frame failed data checksum")
272
273        postamble = bytearray(self.device.read(1))[0]
274        if postamble != 0x00:
275            if tfi != 0xD5:
276                self.log.error(
277                    "Unexpected postamble byte when performing read, got %02x", frame[4]
278                )
279
280        self.device.timeout = 0.1
281        self.device.write(
282            bytearray.fromhex("0000ff00ff00")
283        )  # send ACK frame, there is no response.
284
285        self.log.debug(
286            "Received frame %s%s",
287            hexlify(frame).decode(),
288            hexlify(data_packet).decode(),
289        )
290
291        return data_packet
292