1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! NFCC and RF emulator.
16
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use std::future::Future;
21 use std::net::{Ipv4Addr, SocketAddrV4};
22 use std::pin::{pin, Pin};
23 use std::task::Context;
24 use std::task::Poll;
25 use tokio::io::AsyncReadExt;
26 use tokio::io::AsyncWriteExt;
27 use tokio::net::{tcp, TcpListener, TcpStream};
28 use tokio::select;
29 use tokio::sync::mpsc;
30
31 pub mod controller;
32 pub mod packets;
33
34 use controller::Controller;
35 use packets::{nci, rf};
36
37 const MAX_DEVICES: usize = 128;
38 type Id = u16;
39
40 /// Read RF Control and Data packets received on the RF transport.
41 /// Performs recombination of the segmented packets.
42 pub struct RfReader {
43 socket: tcp::OwnedReadHalf,
44 }
45
46 /// Write RF Control and Data packets received to the RF transport.
47 /// Performs segmentation of the packets.
48 pub struct RfWriter {
49 socket: tcp::OwnedWriteHalf,
50 }
51
52 impl RfReader {
53 /// Create a new RF reader from the TCP socket half.
new(socket: tcp::OwnedReadHalf) -> Self54 pub fn new(socket: tcp::OwnedReadHalf) -> Self {
55 RfReader { socket }
56 }
57
58 /// Read a single RF packet from the reader.
59 /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>60 pub async fn read(&mut self) -> Result<Vec<u8>> {
61 const HEADER_SIZE: usize = 2;
62 let mut header_bytes = [0; HEADER_SIZE];
63
64 // Read the header bytes.
65 self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
66 let packet_length = u16::from_le_bytes(header_bytes) as usize;
67
68 // Read the packet data.
69 let mut packet_bytes = vec![0; packet_length];
70 self.socket.read_exact(&mut packet_bytes).await?;
71
72 Ok(packet_bytes)
73 }
74 }
75
76 impl RfWriter {
77 /// Create a new RF writer from the TCP socket half.
new(socket: tcp::OwnedWriteHalf) -> Self78 pub fn new(socket: tcp::OwnedWriteHalf) -> Self {
79 RfWriter { socket }
80 }
81
82 /// Write a single RF packet to the writer.
83 /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>84 async fn write(&mut self, packet: &[u8]) -> Result<()> {
85 let packet_length: u16 = packet.len().try_into()?;
86 let header_bytes = packet_length.to_le_bytes();
87
88 // Write the header bytes.
89 self.socket.write_all(&header_bytes).await?;
90
91 // Write the packet data.
92 self.socket.write_all(packet).await?;
93
94 Ok(())
95 }
96 }
97
98 /// Represent a generic NFC device interacting on the RF transport.
99 /// Devices communicate together through the RF mpsc channel.
100 /// NFCCs are an instance of Device.
101 pub struct Device {
102 // Unique identifier associated with the device.
103 // The identifier is assured never to be reused in the lifetime of
104 // the emulator.
105 id: u16,
106 // Async task running the controller main loop.
107 task: Pin<Box<dyn Future<Output = Result<()>>>>,
108 // Channel for injecting RF data packets into the controller instance.
109 rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
110 }
111
112 impl Device {
nci( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device113 fn nci(
114 id: Id,
115 socket: TcpStream,
116 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
117 ) -> Device {
118 let (rf_tx, rf_rx) = mpsc::unbounded_channel();
119 Device {
120 id,
121 rf_tx,
122 task: Box::pin(async move {
123 let (nci_rx, nci_tx) = socket.into_split();
124 Controller::run(
125 id,
126 pin!(nci::Reader::new(nci_rx).into_stream()),
127 nci::Writer::new(nci_tx),
128 rf_rx,
129 controller_rf_tx,
130 )
131 .await
132 }),
133 }
134 }
135
rf( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device136 fn rf(
137 id: Id,
138 socket: TcpStream,
139 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
140 ) -> Device {
141 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
142 Device {
143 id,
144 rf_tx,
145 task: Box::pin(async move {
146 let (socket_rx, socket_tx) = socket.into_split();
147 let mut rf_reader = RfReader::new(socket_rx);
148 let mut rf_writer = RfWriter::new(socket_tx);
149
150 let result: Result<((), ())> = futures::future::try_join(
151 async {
152 loop {
153 // Replace the sender identifier in the packet
154 // with the assigned number for the RF connection.
155 // TODO: currently the generated API does not allow
156 // modifying the parsed fields so the change needs to be
157 // applied to the unparsed packet.
158 let mut packet_bytes = rf_reader.read().await?;
159 packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
160
161 // Parse the input packet.
162 let packet = rf::RfPacket::parse(&packet_bytes)?;
163
164 // Forward the packet to other devices.
165 controller_rf_tx.send(packet)?;
166 }
167 },
168 async {
169 loop {
170 // Forward the packet to the socket connection.
171 use pdl_runtime::Packet;
172 let packet = rf_rx
173 .recv()
174 .await
175 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
176 rf_writer.write(&packet.encode_to_vec()?).await?;
177 }
178 },
179 )
180 .await;
181
182 result?;
183 Ok(())
184 }),
185 }
186 }
187 }
188
189 struct Scene {
190 next_id: u16,
191 waker: Option<std::task::Waker>,
192 devices: [Option<Device>; MAX_DEVICES],
193 }
194
195 impl Default for Scene {
default() -> Self196 fn default() -> Self {
197 const NONE: Option<Device> = None;
198 Scene { next_id: 0, waker: None, devices: [NONE; MAX_DEVICES] }
199 }
200 }
201
202 impl Scene {
new() -> Scene203 fn new() -> Scene {
204 Default::default()
205 }
206
wake(&mut self)207 fn wake(&mut self) {
208 if let Some(waker) = self.waker.take() {
209 waker.wake()
210 }
211 }
212
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>213 fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
214 for n in 0..MAX_DEVICES {
215 if self.devices[n].is_none() {
216 self.devices[n] = Some(builder(self.next_id));
217 self.next_id += 1;
218 self.wake();
219 return Ok(n as Id);
220 }
221 }
222 Err(anyhow::anyhow!("max number of connections reached"))
223 }
224
disconnect(&mut self, n: usize)225 fn disconnect(&mut self, n: usize) {
226 let id = self.devices[n].as_ref().unwrap().id;
227 self.devices[n] = None;
228 for other_n in 0..MAX_DEVICES {
229 let Some(ref device) = self.devices[other_n] else { continue };
230 assert!(n != other_n);
231 device
232 .rf_tx
233 .send(
234 rf::DeactivateNotificationBuilder {
235 type_: rf::DeactivateType::Discovery,
236 reason: rf::DeactivateReason::RfLinkLoss,
237 sender: id,
238 receiver: device.id,
239 technology: rf::Technology::NfcA,
240 protocol: rf::Protocol::Undetermined,
241 }
242 .into(),
243 )
244 .expect("failed to send deactive notification")
245 }
246 }
247
send(&self, packet: &rf::RfPacket) -> Result<()>248 fn send(&self, packet: &rf::RfPacket) -> Result<()> {
249 for n in 0..MAX_DEVICES {
250 let Some(ref device) = self.devices[n] else { continue };
251 if packet.get_sender() != device.id
252 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
253 {
254 device.rf_tx.send(packet.to_owned())?;
255 }
256 }
257
258 Ok(())
259 }
260 }
261
262 impl Future for Scene {
263 type Output = ();
264
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>265 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
266 for n in 0..MAX_DEVICES {
267 let dropped = match self.devices[n] {
268 Some(ref mut device) => match device.task.as_mut().poll(cx) {
269 Poll::Ready(Ok(_)) => unreachable!(),
270 Poll::Ready(Err(err)) => {
271 warn!("dropping device {}: {}", n, err);
272 true
273 }
274 Poll::Pending => false,
275 },
276 None => false,
277 };
278 if dropped {
279 self.disconnect(n)
280 }
281 }
282 self.waker = Some(cx.waker().clone());
283 Poll::Pending
284 }
285 }
286
287 #[derive(FromArgs, Debug)]
288 /// Nfc emulator.
289 struct Opt {
290 #[argh(option, default = "7000")]
291 /// configure the TCP port for the NCI server.
292 nci_port: u16,
293 #[argh(option, default = "7001")]
294 /// configure the TCP port for the RF server.
295 rf_port: u16,
296 }
297
run() -> Result<()>298 async fn run() -> Result<()> {
299 env_logger::init_from_env(
300 env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
301 );
302
303 let opt: Opt = argh::from_env();
304 let nci_listener =
305 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.nci_port)).await?;
306 let rf_listener =
307 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.rf_port)).await?;
308 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
309 let mut scene = Scene::new();
310 info!("Listening for NCI connections at address 127.0.0.1:{}", opt.nci_port);
311 info!("Listening for RF connections at address 127.0.0.1:{}", opt.rf_port);
312 loop {
313 select! {
314 result = nci_listener.accept() => {
315 let (socket, addr) = result?;
316 info!("Incoming NCI connection from {}", addr);
317 match scene.add_device(|id| Device::nci(id, socket, rf_tx.clone())) {
318 Ok(id) => info!("Accepted NCI connection from {} in slot {}", addr, id),
319 Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
320 }
321 },
322 result = rf_listener.accept() => {
323 let (socket, addr) = result?;
324 info!("Incoming RF connection from {}", addr);
325 match scene.add_device(|id| Device::rf(id, socket, rf_tx.clone())) {
326 Ok(id) => info!("Accepted RF connection from {} in slot {}", addr, id),
327 Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
328 }
329 },
330 _ = &mut scene => (),
331 result = rf_rx.recv() => {
332 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
333 scene.send(&packet)?
334 }
335 }
336 }
337 }
338
339 #[tokio::main]
main() -> Result<()>340 async fn main() -> Result<()> {
341 run().await
342 }
343