1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 //! This module provides a Vsock Server helper.
17 
18 use anyhow::Context;
19 use log::{error, info};
20 use nix::sys::socket::{connect, socket, AddressFamily, SockFlag, SockType};
21 use serde::Serialize;
22 use serde_json::json;
23 use std::io::Write;
24 use std::os::fd::AsRawFd;
25 use std::sync::atomic::{AtomicBool, Ordering};
26 use std::sync::mpsc;
27 use std::sync::Arc;
28 use std::thread;
29 use vsock::{VsockAddr, VsockListener, VsockStream};
30 
31 /// Serializable Light information.
32 #[derive(Serialize)]
33 pub struct SerializableLight {
34     id: u32,
35     color: u32,
36     light_type: u8,
37     // Should be expanded as needed by improvements to the client.
38 }
39 
40 impl SerializableLight {
new(id: u32, color: u32, light_type: u8) -> Self41     pub fn new(id: u32, color: u32, light_type: u8) -> Self {
42         Self { id, color, light_type }
43     }
44 }
45 
46 /// Vsock server helper.
47 pub struct VsockServer {
48     is_server_running: Arc<AtomicBool>,
49     thread_handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
50     connection_thread_sender: mpsc::Sender<Vec<u8>>,
51     guest_port: u32,
52 }
53 
54 impl VsockServer {
new(port: u32) -> anyhow::Result<Self>55     pub fn new(port: u32) -> anyhow::Result<Self> {
56         let (sender, receiver) = mpsc::channel::<Vec<u8>>();
57         let server = VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, port)?;
58         let running_atomic = Arc::new(AtomicBool::new(true));
59 
60         Ok(Self {
61             thread_handle: Some({
62                 let is_running = running_atomic.clone();
63                 thread::spawn(move || -> anyhow::Result<()> {
64                     while is_running.load(Ordering::SeqCst) {
65                         let (connection, _addr) = server.accept()?;
66                         info!("Lights service vsock server connection established.");
67 
68                         // Connection established, send the start session message.
69                         // If this fails it's because the connection dropped so we need
70                         // to start accepting connections from clients again.
71                         let start_message = json!({
72                             "event": "VIRTUAL_DEVICE_START_LIGHTS_SESSION",
73                         });
74                         let mut json_as_vec = serde_json::to_vec(&start_message)?;
75                         Self::send_buffer_with_length(&connection, json_as_vec)?;
76 
77                         // Receive messages from the channel and send them while the connection is valid.
78                         while is_running.load(Ordering::SeqCst) {
79                             // Block until we receive a new message to send on the socket.
80                             json_as_vec =
81                                 receiver.recv().with_context(|| "Unable to read from channel")?;
82 
83                             if let Err(e) = Self::send_buffer_with_length(&connection, json_as_vec)
84                             {
85                                 error!("Failed to send buffer over socket. Error: {}", e);
86                                 break;
87                             }
88                         }
89                     }
90 
91                     Ok(())
92                 })
93             }),
94             is_server_running: running_atomic,
95             connection_thread_sender: sender,
96             guest_port: port,
97         })
98     }
99 
100     /// Send the buffer length and then the buffer over a socket.
send_buffer_with_length( mut connection: &VsockStream, buffer: Vec<u8>, ) -> anyhow::Result<()>101     fn send_buffer_with_length(
102         mut connection: &VsockStream,
103         buffer: Vec<u8>,
104     ) -> anyhow::Result<()> {
105         let vec_size = buffer.len() as u32;
106 
107         connection
108             .write_all(&vec_size.to_le_bytes())
109             .with_context(|| "Failed to send buffer length over socket")?;
110         connection
111             .write_all(buffer.as_slice())
112             .with_context(|| "Failed to send buffer over socket")?;
113 
114         Ok(())
115     }
116 
send_lights_state(&self, lights: Vec<SerializableLight>)117     pub fn send_lights_state(&self, lights: Vec<SerializableLight>) {
118         let update_message = json!({
119             "event": "VIRTUAL_DEVICE_LIGHTS_UPDATE",
120             "lights": lights,
121         });
122         self.connection_thread_sender
123             .send(serde_json::to_vec(&update_message).unwrap())
124             .expect("Unable to send update on channel");
125     }
126 }
127 
128 impl Drop for VsockServer {
drop(&mut self)129     fn drop(&mut self) {
130         info!("Stopping vsocks server for Lights service");
131 
132         self.is_server_running.store(false, Ordering::SeqCst);
133 
134         // Send the stop message on the channel. This will also unblock the recv() call.
135         let stop_message = json!({
136             "event": "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION",
137         });
138         self.connection_thread_sender
139             .send(serde_json::to_vec(&stop_message).unwrap())
140             .expect("Unable to send on channel");
141 
142         // Try to connect to the server socket locally to unblock the connection
143         // thread just in case it was blocked on accept() instead.
144         let fd = socket(
145             AddressFamily::Vsock,
146             SockType::Stream,
147             SockFlag::SOCK_NONBLOCK | SockFlag::SOCK_CLOEXEC,
148             None,
149         )
150         .unwrap();
151         let addr = VsockAddr::new(vsock::VMADDR_CID_LOCAL, self.guest_port);
152         connect(fd.as_raw_fd(), &addr).unwrap();
153 
154         // We made sure to unblock the connection thread, now join it.
155         let thread_result = self.thread_handle.take().unwrap().join().unwrap();
156         info!("Connection thread finished with: {:?}", thread_result);
157     }
158 }
159