1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 //! This module provides a Vsock Server helper. 17 18 use anyhow::Context; 19 use log::{error, info}; 20 use nix::sys::socket::{connect, socket, AddressFamily, SockFlag, SockType}; 21 use serde::Serialize; 22 use serde_json::json; 23 use std::io::Write; 24 use std::os::fd::AsRawFd; 25 use std::sync::atomic::{AtomicBool, Ordering}; 26 use std::sync::mpsc; 27 use std::sync::Arc; 28 use std::thread; 29 use vsock::{VsockAddr, VsockListener, VsockStream}; 30 31 /// Serializable Light information. 32 #[derive(Serialize)] 33 pub struct SerializableLight { 34 id: u32, 35 color: u32, 36 light_type: u8, 37 // Should be expanded as needed by improvements to the client. 38 } 39 40 impl SerializableLight { new(id: u32, color: u32, light_type: u8) -> Self41 pub fn new(id: u32, color: u32, light_type: u8) -> Self { 42 Self { id, color, light_type } 43 } 44 } 45 46 /// Vsock server helper. 47 pub struct VsockServer { 48 is_server_running: Arc<AtomicBool>, 49 thread_handle: Option<thread::JoinHandle<anyhow::Result<()>>>, 50 connection_thread_sender: mpsc::Sender<Vec<u8>>, 51 guest_port: u32, 52 } 53 54 impl VsockServer { new(port: u32) -> anyhow::Result<Self>55 pub fn new(port: u32) -> anyhow::Result<Self> { 56 let (sender, receiver) = mpsc::channel::<Vec<u8>>(); 57 let server = VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, port)?; 58 let running_atomic = Arc::new(AtomicBool::new(true)); 59 60 Ok(Self { 61 thread_handle: Some({ 62 let is_running = running_atomic.clone(); 63 thread::spawn(move || -> anyhow::Result<()> { 64 while is_running.load(Ordering::SeqCst) { 65 let (connection, _addr) = server.accept()?; 66 info!("Lights service vsock server connection established."); 67 68 // Connection established, send the start session message. 69 // If this fails it's because the connection dropped so we need 70 // to start accepting connections from clients again. 71 let start_message = json!({ 72 "event": "VIRTUAL_DEVICE_START_LIGHTS_SESSION", 73 }); 74 let mut json_as_vec = serde_json::to_vec(&start_message)?; 75 Self::send_buffer_with_length(&connection, json_as_vec)?; 76 77 // Receive messages from the channel and send them while the connection is valid. 78 while is_running.load(Ordering::SeqCst) { 79 // Block until we receive a new message to send on the socket. 80 json_as_vec = 81 receiver.recv().with_context(|| "Unable to read from channel")?; 82 83 if let Err(e) = Self::send_buffer_with_length(&connection, json_as_vec) 84 { 85 error!("Failed to send buffer over socket. Error: {}", e); 86 break; 87 } 88 } 89 } 90 91 Ok(()) 92 }) 93 }), 94 is_server_running: running_atomic, 95 connection_thread_sender: sender, 96 guest_port: port, 97 }) 98 } 99 100 /// Send the buffer length and then the buffer over a socket. send_buffer_with_length( mut connection: &VsockStream, buffer: Vec<u8>, ) -> anyhow::Result<()>101 fn send_buffer_with_length( 102 mut connection: &VsockStream, 103 buffer: Vec<u8>, 104 ) -> anyhow::Result<()> { 105 let vec_size = buffer.len() as u32; 106 107 connection 108 .write_all(&vec_size.to_le_bytes()) 109 .with_context(|| "Failed to send buffer length over socket")?; 110 connection 111 .write_all(buffer.as_slice()) 112 .with_context(|| "Failed to send buffer over socket")?; 113 114 Ok(()) 115 } 116 send_lights_state(&self, lights: Vec<SerializableLight>)117 pub fn send_lights_state(&self, lights: Vec<SerializableLight>) { 118 let update_message = json!({ 119 "event": "VIRTUAL_DEVICE_LIGHTS_UPDATE", 120 "lights": lights, 121 }); 122 self.connection_thread_sender 123 .send(serde_json::to_vec(&update_message).unwrap()) 124 .expect("Unable to send update on channel"); 125 } 126 } 127 128 impl Drop for VsockServer { drop(&mut self)129 fn drop(&mut self) { 130 info!("Stopping vsocks server for Lights service"); 131 132 self.is_server_running.store(false, Ordering::SeqCst); 133 134 // Send the stop message on the channel. This will also unblock the recv() call. 135 let stop_message = json!({ 136 "event": "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION", 137 }); 138 self.connection_thread_sender 139 .send(serde_json::to_vec(&stop_message).unwrap()) 140 .expect("Unable to send on channel"); 141 142 // Try to connect to the server socket locally to unblock the connection 143 // thread just in case it was blocked on accept() instead. 144 let fd = socket( 145 AddressFamily::Vsock, 146 SockType::Stream, 147 SockFlag::SOCK_NONBLOCK | SockFlag::SOCK_CLOEXEC, 148 None, 149 ) 150 .unwrap(); 151 let addr = VsockAddr::new(vsock::VMADDR_CID_LOCAL, self.guest_port); 152 connect(fd.as_raw_fd(), &addr).unwrap(); 153 154 // We made sure to unblock the connection thread, now join it. 155 let thread_result = self.thread_handle.take().unwrap().join().unwrap(); 156 info!("Connection thread finished with: {:?}", thread_result); 157 } 158 } 159