1 // Copyright 2021, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! NCI Protocol Abstraction Layer
16 //! Supports sending NCI commands to the HAL and receiving
17 //! NCI messages back
18
19 use bytes::{BufMut, BytesMut};
20 use log::{debug, error};
21 use nfc_hal::{Hal, HalEventRegistry};
22 use nfc_packets::nci::DataPacketChild::Payload;
23 use nfc_packets::nci::NciPacketChild;
24 use nfc_packets::nci::NotificationChild::ConnCreditsNotification;
25 use nfc_packets::nci::{Command, DataPacket, DataPacketBuilder, Notification};
26 use nfc_packets::nci::{Opcode, PacketBoundaryFlag, Response};
27 use pdl_runtime::Packet;
28 use std::collections::HashMap;
29 use std::collections::VecDeque;
30 use std::sync::{Arc, Mutex};
31 use tokio::select;
32 use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedSender};
33 use tokio::sync::{oneshot, RwLock};
34 use tokio::time::{sleep, Duration, Instant};
35
36 pub mod api;
37
38 /// Result type
39 type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
40
41 /// Initialize the module and connect the channels
init() -> Nci42 pub async fn init() -> Nci {
43 let hc = nfc_hal::init().await;
44 // Channel to handle data upstream messages
45 // let (in_data_int, in_data_ext) = channel::<DataPacket>(10);
46 // Internal data channels
47 // let ic = InternalChannels { in_data_int };
48
49 let (cmd_tx, cmd_rx) = channel::<QueuedCommand>(10);
50 let commands = CommandSender { cmd_tx };
51 let hal_events = hc.hal_events.clone();
52
53 let notifications = EventRegistry { handlers: Arc::new(Mutex::new(HashMap::new())) };
54 let connections = LogicalConnectionsRegistry {
55 conns: Arc::new(RwLock::new(HashMap::new())),
56 sender: hc.out_data_tx.clone(),
57 };
58
59 tokio::spawn(dispatch(notifications, connections.clone(), hc, cmd_rx));
60 Nci { hal_events, commands, connections }
61 }
62
63 /// NCI module external interface
64 pub struct Nci {
65 /// HAL events
66 pub hal_events: HalEventRegistry,
67 /// NCI command communication interface
68 pub commands: CommandSender,
69 /// NCI logical connections
70 pub connections: LogicalConnectionsRegistry,
71 }
72
73 #[derive(Debug)]
74 struct PendingCommand {
75 cmd: Command,
76 response: oneshot::Sender<Response>,
77 }
78
79 #[derive(Debug)]
80 struct QueuedCommand {
81 pending: PendingCommand,
82 notification: Option<oneshot::Sender<Notification>>,
83 }
84
85 /// Sends raw commands. Only useful for facades & shims, or wrapped as a CommandSender.
86 pub struct CommandSender {
87 cmd_tx: Sender<QueuedCommand>,
88 }
89
90 /// The data returned by send_notify() method.
91 pub struct ResponsePendingNotification {
92 /// Command response
93 pub response: Response,
94 /// Pending notification receiver
95 pub notification: oneshot::Receiver<Notification>,
96 }
97
98 impl CommandSender {
99 /// Send a command, but do not expect notification to be returned
send(&mut self, cmd: Command) -> Result<Response>100 pub async fn send(&mut self, cmd: Command) -> Result<Response> {
101 let (tx, rx) = oneshot::channel::<Response>();
102 self.cmd_tx
103 .send(QueuedCommand {
104 pending: PendingCommand { cmd, response: tx },
105 notification: None,
106 })
107 .await?;
108 let event = rx.await?;
109 Ok(event)
110 }
111 /// Send a command which expects notification as a result
send_and_notify(&mut self, cmd: Command) -> Result<ResponsePendingNotification>112 pub async fn send_and_notify(&mut self, cmd: Command) -> Result<ResponsePendingNotification> {
113 let (tx, rx) = oneshot::channel::<Response>();
114 let (ntx, nrx) = oneshot::channel::<Notification>();
115 self.cmd_tx
116 .send(QueuedCommand {
117 pending: PendingCommand { cmd, response: tx },
118 notification: Some(ntx),
119 })
120 .await?;
121 let event = rx.await?;
122 Ok(ResponsePendingNotification { response: event, notification: nrx })
123 }
124 }
125
126 impl Drop for CommandSender {
drop(&mut self)127 fn drop(&mut self) {
128 debug!("CommandSender is dropped");
129 }
130 }
131
132 /// Parameters of a logical connection
133 struct ConnectionParameters {
134 callback: Option<fn(u8, u16, &[u8])>,
135 max_payload_size: u8,
136 nfcc_credits_avail: u8,
137 sendq: VecDeque<DataPacket>,
138 recvq: VecDeque<DataPacket>,
139 }
140
141 impl ConnectionParameters {
142 /// Flush TX queue
flush_tx(&mut self)143 fn flush_tx(&mut self) {
144 self.sendq.clear();
145 }
146 }
147
148 /// To keep track of currentry open logical connections
149 #[derive(Clone)]
150 pub struct LogicalConnectionsRegistry {
151 conns: Arc<RwLock<HashMap<u8, Mutex<ConnectionParameters>>>>,
152 sender: UnboundedSender<DataPacket>,
153 }
154
155 impl LogicalConnectionsRegistry {
156 /// Create a logical connection
open( &mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>, max_payload_size: u8, nfcc_credits_avail: u8, )157 pub async fn open(
158 &mut self,
159 conn_id: u8,
160 cb: Option<fn(u8, u16, &[u8])>,
161 max_payload_size: u8,
162 nfcc_credits_avail: u8,
163 ) {
164 let conn_params = ConnectionParameters {
165 callback: cb,
166 max_payload_size,
167 nfcc_credits_avail,
168 sendq: VecDeque::<DataPacket>::new(),
169 recvq: VecDeque::<DataPacket>::new(),
170 };
171 assert!(
172 self.conns.write().await.insert(conn_id, Mutex::new(conn_params)).is_none(),
173 "A logical connection with id {:?} already exists",
174 conn_id
175 );
176 }
177 /// Set static callback
set_static_callback(&mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>)178 pub async fn set_static_callback(&mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>) {
179 if conn_id < 2 && cb.is_some() {
180 // Static connections
181 if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
182 let mut conn_params = conn_params.lock().unwrap();
183 conn_params.callback = cb;
184 }
185 }
186 }
187 /// Close a logical connection
close(&mut self, conn_id: u8) -> Option<fn(u8, u16, &[u8])>188 pub async fn close(&mut self, conn_id: u8) -> Option<fn(u8, u16, &[u8])> {
189 if let Some(conn_params) = self.conns.write().await.remove(&conn_id) {
190 conn_params.lock().unwrap().callback
191 } else {
192 None
193 }
194 }
195 /// Add credits to a logical connection
add_credits(&self, conn_id: u8, ncreds: u8)196 pub async fn add_credits(&self, conn_id: u8, ncreds: u8) {
197 if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
198 let mut conn_params = conn_params.lock().unwrap();
199 conn_params.nfcc_credits_avail += ncreds;
200 while !conn_params.sendq.is_empty() && conn_params.nfcc_credits_avail > 0 {
201 self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
202 conn_params.nfcc_credits_avail -= 1;
203 }
204 }
205 }
206
207 /// Send a packet to a logical channel, splitting it if needed
send_packet(&mut self, conn_id: u8, pkt: DataPacket)208 pub async fn send_packet(&mut self, conn_id: u8, pkt: DataPacket) {
209 if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
210 let mut conn_params = conn_params.lock().unwrap();
211 if let Payload(mut p) = pkt.specialize() {
212 if p.len() > conn_params.max_payload_size.into() {
213 let conn_id = pkt.get_conn_id();
214 while p.len() > conn_params.max_payload_size.into() {
215 let part = DataPacketBuilder {
216 conn_id,
217 pbf: PacketBoundaryFlag::Incomplete,
218 cr: 0,
219 payload: Some(p.split_to(conn_params.max_payload_size.into())),
220 }
221 .build();
222 conn_params.sendq.push_back(part);
223 }
224 if !p.is_empty() {
225 let end = DataPacketBuilder {
226 conn_id,
227 pbf: PacketBoundaryFlag::CompleteOrFinal,
228 cr: 0,
229 payload: Some(p),
230 }
231 .build();
232 conn_params.sendq.push_back(end);
233 }
234 } else {
235 conn_params.sendq.push_back(pkt);
236 }
237 }
238 while conn_params.nfcc_credits_avail > 0 && !conn_params.sendq.is_empty() {
239 self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
240 conn_params.nfcc_credits_avail -= 1;
241 }
242 }
243 }
244
245 /// Send data packet callback to the upper layers
send_callback(&self, pkt: DataPacket)246 pub async fn send_callback(&self, pkt: DataPacket) {
247 let conn_id = pkt.get_conn_id();
248 let ncreds = pkt.get_cr();
249 if ncreds > 0 {
250 self.add_credits(conn_id, ncreds).await;
251 }
252 let done = pkt.get_pbf() == PacketBoundaryFlag::CompleteOrFinal;
253 if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
254 let mut conn_params = conn_params.lock().unwrap();
255 if !done && conn_params.recvq.is_empty() {
256 const NFC_DATA_START_CEVT: u16 = 5;
257 let cb = conn_params.callback.unwrap();
258 cb(conn_id, NFC_DATA_START_CEVT, &[]);
259 }
260 conn_params.recvq.push_back(pkt);
261 if done {
262 const NFC_DATA_CEVT_SIZE: usize = 4; // 3 for header and 1 for status
263 let cap = conn_params.recvq.len() * conn_params.max_payload_size as usize
264 + NFC_DATA_CEVT_SIZE;
265 let mut buffer = BytesMut::with_capacity(cap);
266 buffer.put_u8(0u8); // status
267 let pkt = conn_params.recvq.pop_front().unwrap();
268 buffer.put(pkt.encode_to_bytes().unwrap());
269 while !conn_params.recvq.is_empty() {
270 let pkt = conn_params.recvq.pop_front().unwrap();
271 if let Payload(p) = pkt.specialize() {
272 buffer.put(p);
273 }
274 }
275 let data_cevt = buffer.freeze();
276 let cb = conn_params.callback.unwrap();
277 const NFC_DATA_CEVT: u16 = 3;
278 cb(conn_id, NFC_DATA_CEVT, data_cevt.as_ref());
279 }
280 }
281 }
282
283 /// Flush outgoing data queue
flush_data(&mut self, conn_id: u8) -> bool284 pub async fn flush_data(&mut self, conn_id: u8) -> bool {
285 if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
286 conn_params.lock().unwrap().flush_tx();
287 true
288 } else {
289 false
290 }
291 }
292 }
293
294 /// Provides ability to register and unregister for NCI notifications
295 #[derive(Clone)]
296 pub struct EventRegistry {
297 handlers: Arc<Mutex<HashMap<Opcode, oneshot::Sender<Notification>>>>,
298 }
299
300 impl EventRegistry {
301 /// Indicate interest in specific NCI notification
register(&mut self, code: Opcode, sender: oneshot::Sender<Notification>)302 pub async fn register(&mut self, code: Opcode, sender: oneshot::Sender<Notification>) {
303 assert!(
304 self.handlers.lock().unwrap().insert(code, sender).is_none(),
305 "A handler for {:?} is already registered",
306 code
307 );
308 }
309
310 /// Remove interest in specific NCI notification
unregister(&mut self, code: Opcode) -> Option<oneshot::Sender<Notification>>311 pub async fn unregister(&mut self, code: Opcode) -> Option<oneshot::Sender<Notification>> {
312 self.handlers.lock().unwrap().remove(&code)
313 }
314 }
315
dispatch( mut ntfs: EventRegistry, lcons: LogicalConnectionsRegistry, mut hc: Hal, mut cmd_rx: Receiver<QueuedCommand>, ) -> Result<()>316 async fn dispatch(
317 mut ntfs: EventRegistry,
318 lcons: LogicalConnectionsRegistry,
319 mut hc: Hal,
320 // ic: InternalChannels,
321 mut cmd_rx: Receiver<QueuedCommand>,
322 ) -> Result<()> {
323 let mut pending: Option<PendingCommand> = None;
324 let timeout = sleep(Duration::MAX);
325 // The max_deadline is used to set the sleep() deadline to a very distant moment in
326 // the future, when the notification from the timer is not required.
327 let max_deadline = timeout.deadline();
328 tokio::pin!(timeout);
329 loop {
330 select! {
331 Some(cmd) = hc.in_cmd_rx.recv() => {
332 match cmd.specialize() {
333 NciPacketChild::Response(rsp) => {
334 timeout.as_mut().reset(max_deadline);
335 let this_opcode = rsp.get_cmd_op();
336 match pending.take() {
337 Some(PendingCommand{cmd, response}) if cmd.get_op() == this_opcode => {
338 if let Err(e) = response.send(rsp) {
339 error!("failure dispatching command status {:?}", e);
340 }
341 },
342 Some(PendingCommand{cmd, ..}) => panic!("Waiting for {:?}, got {:?}", cmd.get_op(), this_opcode),
343 None => panic!("Unexpected status event with opcode {:?}", this_opcode),
344 }
345 },
346 NciPacketChild::Notification(ntfy) => {
347 match ntfy.specialize() {
348 ConnCreditsNotification(ccnp) => {
349 let conns = ccnp.get_conns();
350 for conn in conns {
351 lcons.add_credits(conn.conn_id, conn.ncredits).await;
352 }
353 },
354 _ => {
355 let code = ntfy.get_cmd_op();
356 match ntfs.unregister(code).await {
357 Some(sender) => {
358 if let Err(e) = sender.send(ntfy) {
359 error!("notification channel closed {:?}", e);
360 }
361 },
362 None => panic!("Unhandled notification {:?}", code),
363 }
364 },
365 }
366 },
367 _ => error!("Unexpected NCI data received {:?}", cmd),
368 }
369 },
370 qc = cmd_rx.recv(), if pending.is_none() => if let Some(queued) = qc {
371 debug!("cmd_rx got a q");
372 if let Some(nsender) = queued.notification {
373 ntfs.register(queued.pending.cmd.get_op(), nsender).await;
374 }
375 if let Err(e) = hc.out_cmd_tx.send(queued.pending.cmd.clone().into()) {
376 error!("command queue closed: {:?}", e);
377 }
378 timeout.as_mut().reset(Instant::now() + Duration::from_millis(20));
379 pending = Some(queued.pending);
380 } else {
381 break;
382 },
383 () = &mut timeout => {
384 error!("Command processing timeout");
385 timeout.as_mut().reset(max_deadline);
386 pending = None;
387 },
388 Some(data) = hc.in_data_rx.recv() => lcons.send_callback(data).await,
389 else => {
390 debug!("Select is done");
391 break;
392 },
393 }
394 }
395 debug!("NCI dispatch is terminated.");
396 Ok(())
397 }
398