1 // Copyright 2021, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! NCI Protocol Abstraction Layer
16 //! Supports sending NCI commands to the HAL and receiving
17 //! NCI messages back
18 
19 use bytes::{BufMut, BytesMut};
20 use log::{debug, error};
21 use nfc_hal::{Hal, HalEventRegistry};
22 use nfc_packets::nci::DataPacketChild::Payload;
23 use nfc_packets::nci::NciPacketChild;
24 use nfc_packets::nci::NotificationChild::ConnCreditsNotification;
25 use nfc_packets::nci::{Command, DataPacket, DataPacketBuilder, Notification};
26 use nfc_packets::nci::{Opcode, PacketBoundaryFlag, Response};
27 use pdl_runtime::Packet;
28 use std::collections::HashMap;
29 use std::collections::VecDeque;
30 use std::sync::{Arc, Mutex};
31 use tokio::select;
32 use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedSender};
33 use tokio::sync::{oneshot, RwLock};
34 use tokio::time::{sleep, Duration, Instant};
35 
36 pub mod api;
37 
38 /// Result type
39 type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
40 
41 /// Initialize the module and connect the channels
init() -> Nci42 pub async fn init() -> Nci {
43     let hc = nfc_hal::init().await;
44     // Channel to handle data upstream messages
45     //    let (in_data_int, in_data_ext) = channel::<DataPacket>(10);
46     // Internal data channels
47     //    let ic = InternalChannels { in_data_int };
48 
49     let (cmd_tx, cmd_rx) = channel::<QueuedCommand>(10);
50     let commands = CommandSender { cmd_tx };
51     let hal_events = hc.hal_events.clone();
52 
53     let notifications = EventRegistry { handlers: Arc::new(Mutex::new(HashMap::new())) };
54     let connections = LogicalConnectionsRegistry {
55         conns: Arc::new(RwLock::new(HashMap::new())),
56         sender: hc.out_data_tx.clone(),
57     };
58 
59     tokio::spawn(dispatch(notifications, connections.clone(), hc, cmd_rx));
60     Nci { hal_events, commands, connections }
61 }
62 
63 /// NCI module external interface
64 pub struct Nci {
65     /// HAL events
66     pub hal_events: HalEventRegistry,
67     /// NCI command communication interface
68     pub commands: CommandSender,
69     /// NCI logical connections
70     pub connections: LogicalConnectionsRegistry,
71 }
72 
73 #[derive(Debug)]
74 struct PendingCommand {
75     cmd: Command,
76     response: oneshot::Sender<Response>,
77 }
78 
79 #[derive(Debug)]
80 struct QueuedCommand {
81     pending: PendingCommand,
82     notification: Option<oneshot::Sender<Notification>>,
83 }
84 
85 /// Sends raw commands. Only useful for facades & shims, or wrapped as a CommandSender.
86 pub struct CommandSender {
87     cmd_tx: Sender<QueuedCommand>,
88 }
89 
90 /// The data returned by send_notify() method.
91 pub struct ResponsePendingNotification {
92     /// Command response
93     pub response: Response,
94     /// Pending notification receiver
95     pub notification: oneshot::Receiver<Notification>,
96 }
97 
98 impl CommandSender {
99     /// Send a command, but do not expect notification to be returned
send(&mut self, cmd: Command) -> Result<Response>100     pub async fn send(&mut self, cmd: Command) -> Result<Response> {
101         let (tx, rx) = oneshot::channel::<Response>();
102         self.cmd_tx
103             .send(QueuedCommand {
104                 pending: PendingCommand { cmd, response: tx },
105                 notification: None,
106             })
107             .await?;
108         let event = rx.await?;
109         Ok(event)
110     }
111     /// Send a command which expects notification as a result
send_and_notify(&mut self, cmd: Command) -> Result<ResponsePendingNotification>112     pub async fn send_and_notify(&mut self, cmd: Command) -> Result<ResponsePendingNotification> {
113         let (tx, rx) = oneshot::channel::<Response>();
114         let (ntx, nrx) = oneshot::channel::<Notification>();
115         self.cmd_tx
116             .send(QueuedCommand {
117                 pending: PendingCommand { cmd, response: tx },
118                 notification: Some(ntx),
119             })
120             .await?;
121         let event = rx.await?;
122         Ok(ResponsePendingNotification { response: event, notification: nrx })
123     }
124 }
125 
126 impl Drop for CommandSender {
drop(&mut self)127     fn drop(&mut self) {
128         debug!("CommandSender is dropped");
129     }
130 }
131 
132 /// Parameters of a logical connection
133 struct ConnectionParameters {
134     callback: Option<fn(u8, u16, &[u8])>,
135     max_payload_size: u8,
136     nfcc_credits_avail: u8,
137     sendq: VecDeque<DataPacket>,
138     recvq: VecDeque<DataPacket>,
139 }
140 
141 impl ConnectionParameters {
142     /// Flush TX queue
flush_tx(&mut self)143     fn flush_tx(&mut self) {
144         self.sendq.clear();
145     }
146 }
147 
148 /// To keep track of currentry open logical connections
149 #[derive(Clone)]
150 pub struct LogicalConnectionsRegistry {
151     conns: Arc<RwLock<HashMap<u8, Mutex<ConnectionParameters>>>>,
152     sender: UnboundedSender<DataPacket>,
153 }
154 
155 impl LogicalConnectionsRegistry {
156     /// Create a logical connection
open( &mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>, max_payload_size: u8, nfcc_credits_avail: u8, )157     pub async fn open(
158         &mut self,
159         conn_id: u8,
160         cb: Option<fn(u8, u16, &[u8])>,
161         max_payload_size: u8,
162         nfcc_credits_avail: u8,
163     ) {
164         let conn_params = ConnectionParameters {
165             callback: cb,
166             max_payload_size,
167             nfcc_credits_avail,
168             sendq: VecDeque::<DataPacket>::new(),
169             recvq: VecDeque::<DataPacket>::new(),
170         };
171         assert!(
172             self.conns.write().await.insert(conn_id, Mutex::new(conn_params)).is_none(),
173             "A logical connection with id {:?} already exists",
174             conn_id
175         );
176     }
177     /// Set static callback
set_static_callback(&mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>)178     pub async fn set_static_callback(&mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>) {
179         if conn_id < 2 && cb.is_some() {
180             // Static connections
181             if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
182                 let mut conn_params = conn_params.lock().unwrap();
183                 conn_params.callback = cb;
184             }
185         }
186     }
187     /// Close a logical connection
close(&mut self, conn_id: u8) -> Option<fn(u8, u16, &[u8])>188     pub async fn close(&mut self, conn_id: u8) -> Option<fn(u8, u16, &[u8])> {
189         if let Some(conn_params) = self.conns.write().await.remove(&conn_id) {
190             conn_params.lock().unwrap().callback
191         } else {
192             None
193         }
194     }
195     /// Add credits to a logical connection
add_credits(&self, conn_id: u8, ncreds: u8)196     pub async fn add_credits(&self, conn_id: u8, ncreds: u8) {
197         if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
198             let mut conn_params = conn_params.lock().unwrap();
199             conn_params.nfcc_credits_avail += ncreds;
200             while !conn_params.sendq.is_empty() && conn_params.nfcc_credits_avail > 0 {
201                 self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
202                 conn_params.nfcc_credits_avail -= 1;
203             }
204         }
205     }
206 
207     /// Send a packet to a logical channel, splitting it if needed
send_packet(&mut self, conn_id: u8, pkt: DataPacket)208     pub async fn send_packet(&mut self, conn_id: u8, pkt: DataPacket) {
209         if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
210             let mut conn_params = conn_params.lock().unwrap();
211             if let Payload(mut p) = pkt.specialize() {
212                 if p.len() > conn_params.max_payload_size.into() {
213                     let conn_id = pkt.get_conn_id();
214                     while p.len() > conn_params.max_payload_size.into() {
215                         let part = DataPacketBuilder {
216                             conn_id,
217                             pbf: PacketBoundaryFlag::Incomplete,
218                             cr: 0,
219                             payload: Some(p.split_to(conn_params.max_payload_size.into())),
220                         }
221                         .build();
222                         conn_params.sendq.push_back(part);
223                     }
224                     if !p.is_empty() {
225                         let end = DataPacketBuilder {
226                             conn_id,
227                             pbf: PacketBoundaryFlag::CompleteOrFinal,
228                             cr: 0,
229                             payload: Some(p),
230                         }
231                         .build();
232                         conn_params.sendq.push_back(end);
233                     }
234                 } else {
235                     conn_params.sendq.push_back(pkt);
236                 }
237             }
238             while conn_params.nfcc_credits_avail > 0 && !conn_params.sendq.is_empty() {
239                 self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
240                 conn_params.nfcc_credits_avail -= 1;
241             }
242         }
243     }
244 
245     /// Send data packet callback to the upper layers
send_callback(&self, pkt: DataPacket)246     pub async fn send_callback(&self, pkt: DataPacket) {
247         let conn_id = pkt.get_conn_id();
248         let ncreds = pkt.get_cr();
249         if ncreds > 0 {
250             self.add_credits(conn_id, ncreds).await;
251         }
252         let done = pkt.get_pbf() == PacketBoundaryFlag::CompleteOrFinal;
253         if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
254             let mut conn_params = conn_params.lock().unwrap();
255             if !done && conn_params.recvq.is_empty() {
256                 const NFC_DATA_START_CEVT: u16 = 5;
257                 let cb = conn_params.callback.unwrap();
258                 cb(conn_id, NFC_DATA_START_CEVT, &[]);
259             }
260             conn_params.recvq.push_back(pkt);
261             if done {
262                 const NFC_DATA_CEVT_SIZE: usize = 4; // 3 for header and 1 for status
263                 let cap = conn_params.recvq.len() * conn_params.max_payload_size as usize
264                     + NFC_DATA_CEVT_SIZE;
265                 let mut buffer = BytesMut::with_capacity(cap);
266                 buffer.put_u8(0u8); // status
267                 let pkt = conn_params.recvq.pop_front().unwrap();
268                 buffer.put(pkt.encode_to_bytes().unwrap());
269                 while !conn_params.recvq.is_empty() {
270                     let pkt = conn_params.recvq.pop_front().unwrap();
271                     if let Payload(p) = pkt.specialize() {
272                         buffer.put(p);
273                     }
274                 }
275                 let data_cevt = buffer.freeze();
276                 let cb = conn_params.callback.unwrap();
277                 const NFC_DATA_CEVT: u16 = 3;
278                 cb(conn_id, NFC_DATA_CEVT, data_cevt.as_ref());
279             }
280         }
281     }
282 
283     /// Flush outgoing data queue
flush_data(&mut self, conn_id: u8) -> bool284     pub async fn flush_data(&mut self, conn_id: u8) -> bool {
285         if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
286             conn_params.lock().unwrap().flush_tx();
287             true
288         } else {
289             false
290         }
291     }
292 }
293 
294 /// Provides ability to register and unregister for NCI notifications
295 #[derive(Clone)]
296 pub struct EventRegistry {
297     handlers: Arc<Mutex<HashMap<Opcode, oneshot::Sender<Notification>>>>,
298 }
299 
300 impl EventRegistry {
301     /// Indicate interest in specific NCI notification
register(&mut self, code: Opcode, sender: oneshot::Sender<Notification>)302     pub async fn register(&mut self, code: Opcode, sender: oneshot::Sender<Notification>) {
303         assert!(
304             self.handlers.lock().unwrap().insert(code, sender).is_none(),
305             "A handler for {:?} is already registered",
306             code
307         );
308     }
309 
310     /// Remove interest in specific NCI notification
unregister(&mut self, code: Opcode) -> Option<oneshot::Sender<Notification>>311     pub async fn unregister(&mut self, code: Opcode) -> Option<oneshot::Sender<Notification>> {
312         self.handlers.lock().unwrap().remove(&code)
313     }
314 }
315 
dispatch( mut ntfs: EventRegistry, lcons: LogicalConnectionsRegistry, mut hc: Hal, mut cmd_rx: Receiver<QueuedCommand>, ) -> Result<()>316 async fn dispatch(
317     mut ntfs: EventRegistry,
318     lcons: LogicalConnectionsRegistry,
319     mut hc: Hal,
320     //    ic: InternalChannels,
321     mut cmd_rx: Receiver<QueuedCommand>,
322 ) -> Result<()> {
323     let mut pending: Option<PendingCommand> = None;
324     let timeout = sleep(Duration::MAX);
325     // The max_deadline is used to set  the sleep() deadline to a very distant moment in
326     // the future, when the notification from the timer is not required.
327     let max_deadline = timeout.deadline();
328     tokio::pin!(timeout);
329     loop {
330         select! {
331             Some(cmd) = hc.in_cmd_rx.recv() => {
332                 match cmd.specialize() {
333                     NciPacketChild::Response(rsp) => {
334                         timeout.as_mut().reset(max_deadline);
335                         let this_opcode = rsp.get_cmd_op();
336                         match pending.take() {
337                             Some(PendingCommand{cmd, response}) if cmd.get_op() == this_opcode => {
338                                 if let Err(e) = response.send(rsp) {
339                                     error!("failure dispatching command status {:?}", e);
340                                 }
341                             },
342                             Some(PendingCommand{cmd, ..}) => panic!("Waiting for {:?}, got {:?}", cmd.get_op(), this_opcode),
343                             None => panic!("Unexpected status event with opcode {:?}", this_opcode),
344                         }
345                     },
346                     NciPacketChild::Notification(ntfy) => {
347                         match ntfy.specialize() {
348                             ConnCreditsNotification(ccnp) => {
349                                 let conns = ccnp.get_conns();
350                                 for conn in conns {
351                                     lcons.add_credits(conn.conn_id, conn.ncredits).await;
352                                 }
353                             },
354                             _ => {
355                                 let code = ntfy.get_cmd_op();
356                                 match ntfs.unregister(code).await {
357                                     Some(sender) => {
358                                         if let Err(e) = sender.send(ntfy) {
359                                             error!("notification channel closed {:?}", e);
360                                         }
361                                     },
362                                     None => panic!("Unhandled notification {:?}", code),
363                                 }
364                             },
365                         }
366                     },
367                     _ => error!("Unexpected NCI data received {:?}", cmd),
368                 }
369             },
370             qc = cmd_rx.recv(), if pending.is_none() => if let Some(queued) = qc {
371                 debug!("cmd_rx got a q");
372                 if let Some(nsender) = queued.notification {
373                     ntfs.register(queued.pending.cmd.get_op(), nsender).await;
374                 }
375                 if let Err(e) = hc.out_cmd_tx.send(queued.pending.cmd.clone().into()) {
376                     error!("command queue closed: {:?}", e);
377                 }
378                 timeout.as_mut().reset(Instant::now() + Duration::from_millis(20));
379                 pending = Some(queued.pending);
380             } else {
381                 break;
382             },
383             () = &mut timeout => {
384                 error!("Command processing timeout");
385                 timeout.as_mut().reset(max_deadline);
386                 pending = None;
387             },
388             Some(data) = hc.in_data_rx.recv() => lcons.send_callback(data).await,
389             else => {
390                 debug!("Select is done");
391                 break;
392             },
393         }
394     }
395     debug!("NCI dispatch is terminated.");
396     Ok(())
397 }
398