1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! NFCC and RF emulator.
16 
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use std::future::Future;
21 use std::net::{Ipv4Addr, SocketAddrV4};
22 use std::pin::{pin, Pin};
23 use std::task::Context;
24 use std::task::Poll;
25 use tokio::io::AsyncReadExt;
26 use tokio::io::AsyncWriteExt;
27 use tokio::net::{tcp, TcpListener, TcpStream};
28 use tokio::select;
29 use tokio::sync::mpsc;
30 
31 pub mod controller;
32 pub mod packets;
33 
34 use controller::Controller;
35 use packets::{nci, rf};
36 
37 const MAX_DEVICES: usize = 128;
38 type Id = u16;
39 
40 /// Read RF Control and Data packets received on the RF transport.
41 /// Performs recombination of the segmented packets.
42 pub struct RfReader {
43     socket: tcp::OwnedReadHalf,
44 }
45 
46 /// Write RF Control and Data packets received to the RF transport.
47 /// Performs segmentation of the packets.
48 pub struct RfWriter {
49     socket: tcp::OwnedWriteHalf,
50 }
51 
52 impl RfReader {
53     /// Create a new RF reader from the TCP socket half.
new(socket: tcp::OwnedReadHalf) -> Self54     pub fn new(socket: tcp::OwnedReadHalf) -> Self {
55         RfReader { socket }
56     }
57 
58     /// Read a single RF packet from the reader.
59     /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>60     pub async fn read(&mut self) -> Result<Vec<u8>> {
61         const HEADER_SIZE: usize = 2;
62         let mut header_bytes = [0; HEADER_SIZE];
63 
64         // Read the header bytes.
65         self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
66         let packet_length = u16::from_le_bytes(header_bytes) as usize;
67 
68         // Read the packet data.
69         let mut packet_bytes = vec![0; packet_length];
70         self.socket.read_exact(&mut packet_bytes).await?;
71 
72         Ok(packet_bytes)
73     }
74 }
75 
76 impl RfWriter {
77     /// Create a new RF writer from the TCP socket half.
new(socket: tcp::OwnedWriteHalf) -> Self78     pub fn new(socket: tcp::OwnedWriteHalf) -> Self {
79         RfWriter { socket }
80     }
81 
82     /// Write a single RF packet to the writer.
83     /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>84     async fn write(&mut self, packet: &[u8]) -> Result<()> {
85         let packet_length: u16 = packet.len().try_into()?;
86         let header_bytes = packet_length.to_le_bytes();
87 
88         // Write the header bytes.
89         self.socket.write_all(&header_bytes).await?;
90 
91         // Write the packet data.
92         self.socket.write_all(packet).await?;
93 
94         Ok(())
95     }
96 }
97 
98 /// Represent a generic NFC device interacting on the RF transport.
99 /// Devices communicate together through the RF mpsc channel.
100 /// NFCCs are an instance of Device.
101 pub struct Device {
102     // Unique identifier associated with the device.
103     // The identifier is assured never to be reused in the lifetime of
104     // the emulator.
105     id: u16,
106     // Async task running the controller main loop.
107     task: Pin<Box<dyn Future<Output = Result<()>>>>,
108     // Channel for injecting RF data packets into the controller instance.
109     rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
110 }
111 
112 impl Device {
nci( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device113     fn nci(
114         id: Id,
115         socket: TcpStream,
116         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
117     ) -> Device {
118         let (rf_tx, rf_rx) = mpsc::unbounded_channel();
119         Device {
120             id,
121             rf_tx,
122             task: Box::pin(async move {
123                 let (nci_rx, nci_tx) = socket.into_split();
124                 Controller::run(
125                     id,
126                     pin!(nci::Reader::new(nci_rx).into_stream()),
127                     nci::Writer::new(nci_tx),
128                     rf_rx,
129                     controller_rf_tx,
130                 )
131                 .await
132             }),
133         }
134     }
135 
rf( id: Id, socket: TcpStream, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device136     fn rf(
137         id: Id,
138         socket: TcpStream,
139         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
140     ) -> Device {
141         let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
142         Device {
143             id,
144             rf_tx,
145             task: Box::pin(async move {
146                 let (socket_rx, socket_tx) = socket.into_split();
147                 let mut rf_reader = RfReader::new(socket_rx);
148                 let mut rf_writer = RfWriter::new(socket_tx);
149 
150                 let result: Result<((), ())> = futures::future::try_join(
151                     async {
152                         loop {
153                             // Replace the sender identifier in the packet
154                             // with the assigned number for the RF connection.
155                             // TODO: currently the generated API does not allow
156                             // modifying the parsed fields so the change needs to be
157                             // applied to the unparsed packet.
158                             let mut packet_bytes = rf_reader.read().await?;
159                             packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
160 
161                             // Parse the input packet.
162                             let packet = rf::RfPacket::parse(&packet_bytes)?;
163 
164                             // Forward the packet to other devices.
165                             controller_rf_tx.send(packet)?;
166                         }
167                     },
168                     async {
169                         loop {
170                             // Forward the packet to the socket connection.
171                             use pdl_runtime::Packet;
172                             let packet = rf_rx
173                                 .recv()
174                                 .await
175                                 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
176                             rf_writer.write(&packet.encode_to_vec()?).await?;
177                         }
178                     },
179                 )
180                 .await;
181 
182                 result?;
183                 Ok(())
184             }),
185         }
186     }
187 }
188 
189 struct Scene {
190     next_id: u16,
191     waker: Option<std::task::Waker>,
192     devices: [Option<Device>; MAX_DEVICES],
193 }
194 
195 impl Default for Scene {
default() -> Self196     fn default() -> Self {
197         const NONE: Option<Device> = None;
198         Scene { next_id: 0, waker: None, devices: [NONE; MAX_DEVICES] }
199     }
200 }
201 
202 impl Scene {
new() -> Scene203     fn new() -> Scene {
204         Default::default()
205     }
206 
wake(&mut self)207     fn wake(&mut self) {
208         if let Some(waker) = self.waker.take() {
209             waker.wake()
210         }
211     }
212 
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>213     fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
214         for n in 0..MAX_DEVICES {
215             if self.devices[n].is_none() {
216                 self.devices[n] = Some(builder(self.next_id));
217                 self.next_id += 1;
218                 self.wake();
219                 return Ok(n as Id);
220             }
221         }
222         Err(anyhow::anyhow!("max number of connections reached"))
223     }
224 
disconnect(&mut self, n: usize)225     fn disconnect(&mut self, n: usize) {
226         let id = self.devices[n].as_ref().unwrap().id;
227         self.devices[n] = None;
228         for other_n in 0..MAX_DEVICES {
229             let Some(ref device) = self.devices[other_n] else { continue };
230             assert!(n != other_n);
231             device
232                 .rf_tx
233                 .send(
234                     rf::DeactivateNotificationBuilder {
235                         type_: rf::DeactivateType::Discovery,
236                         reason: rf::DeactivateReason::RfLinkLoss,
237                         sender: id,
238                         receiver: device.id,
239                         technology: rf::Technology::NfcA,
240                         protocol: rf::Protocol::Undetermined,
241                     }
242                     .into(),
243                 )
244                 .expect("failed to send deactive notification")
245         }
246     }
247 
send(&self, packet: &rf::RfPacket) -> Result<()>248     fn send(&self, packet: &rf::RfPacket) -> Result<()> {
249         for n in 0..MAX_DEVICES {
250             let Some(ref device) = self.devices[n] else { continue };
251             if packet.get_sender() != device.id
252                 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
253             {
254                 device.rf_tx.send(packet.to_owned())?;
255             }
256         }
257 
258         Ok(())
259     }
260 }
261 
262 impl Future for Scene {
263     type Output = ();
264 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>265     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
266         for n in 0..MAX_DEVICES {
267             let dropped = match self.devices[n] {
268                 Some(ref mut device) => match device.task.as_mut().poll(cx) {
269                     Poll::Ready(Ok(_)) => unreachable!(),
270                     Poll::Ready(Err(err)) => {
271                         warn!("dropping device {}: {}", n, err);
272                         true
273                     }
274                     Poll::Pending => false,
275                 },
276                 None => false,
277             };
278             if dropped {
279                 self.disconnect(n)
280             }
281         }
282         self.waker = Some(cx.waker().clone());
283         Poll::Pending
284     }
285 }
286 
287 #[derive(FromArgs, Debug)]
288 /// Nfc emulator.
289 struct Opt {
290     #[argh(option, default = "7000")]
291     /// configure the TCP port for the NCI server.
292     nci_port: u16,
293     #[argh(option, default = "7001")]
294     /// configure the TCP port for the RF server.
295     rf_port: u16,
296 }
297 
run() -> Result<()>298 async fn run() -> Result<()> {
299     env_logger::init_from_env(
300         env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
301     );
302 
303     let opt: Opt = argh::from_env();
304     let nci_listener =
305         TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.nci_port)).await?;
306     let rf_listener =
307         TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, opt.rf_port)).await?;
308     let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
309     let mut scene = Scene::new();
310     info!("Listening for NCI connections at address 127.0.0.1:{}", opt.nci_port);
311     info!("Listening for RF connections at address 127.0.0.1:{}", opt.rf_port);
312     loop {
313         select! {
314             result = nci_listener.accept() => {
315                 let (socket, addr) = result?;
316                 info!("Incoming NCI connection from {}", addr);
317                 match scene.add_device(|id| Device::nci(id, socket, rf_tx.clone())) {
318                     Ok(id) => info!("Accepted NCI connection from {} in slot {}", addr, id),
319                     Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
320                 }
321             },
322             result = rf_listener.accept() => {
323                 let (socket, addr) = result?;
324                 info!("Incoming RF connection from {}", addr);
325                 match scene.add_device(|id| Device::rf(id, socket, rf_tx.clone())) {
326                     Ok(id) => info!("Accepted RF connection from {} in slot {}", addr, id),
327                     Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
328                 }
329             },
330             _ = &mut scene => (),
331             result = rf_rx.recv() => {
332                 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
333                 scene.send(&packet)?
334             }
335         }
336     }
337 }
338 
339 #[tokio::main]
main() -> Result<()>340 async fn main() -> Result<()> {
341     run().await
342 }
343