1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "backend/grpc_server.h"
16 
17 #include <google/protobuf/util/json_util.h>
18 #include <stdlib.h>
19 
20 #include <cstdint>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 
26 #include "google/protobuf/empty.pb.h"
27 #include "grpcpp/server_context.h"
28 #include "grpcpp/support/status.h"
29 #include "netsim-daemon/src/ffi.rs.h"
30 #include "netsim/common.pb.h"
31 #include "netsim/packet_streamer.grpc.pb.h"
32 #include "netsim/packet_streamer.pb.h"
33 #include "util/log.h"
34 
35 namespace netsim {
36 namespace backend {
37 namespace {
38 
39 using netsim::common::ChipKind;
40 
41 using Stream =
42     ::grpc::ServerReaderWriter<packet::PacketResponse, packet::PacketRequest>;
43 
44 using netsim::startup::Chip;
45 
46 // Mapping from chip_id to streams.
47 std::unordered_map<uint32_t, Stream *> chip_id_to_stream;
48 
49 // Libslirp is not thread safe. Use a lock to prevent concurrent access to
50 // libslirp.
51 std::mutex gSlirpMutex;
52 
53 // Service handles the gRPC StreamPackets requests.
54 
55 class ServiceImpl final : public packet::PacketStreamer::Service {
56  public:
StreamPackets(::grpc::ServerContext * context,Stream * stream)57   ::grpc::Status StreamPackets(::grpc::ServerContext *context,
58                                Stream *stream) override {
59     // Now connected to a peer issuing a bi-directional streaming grpc
60     auto peer = context->peer();
61     BtsLogInfo("grpc_server new packet_stream for peer %s", peer.c_str());
62 
63     packet::PacketRequest request;
64 
65     // First packet must have initial_info describing the peer
66     bool success = stream->Read(&request);
67     if (!success || !request.has_initial_info()) {
68       BtsLogError("ServiceImpl no initial information or stream closed");
69       return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
70                             "Missing initial_info in first packet.");
71     }
72 
73     auto device_name = request.initial_info().name();
74     auto chip_kind = request.initial_info().chip().kind();
75     // multiple chips of the same chip_kind for a device have a name
76     auto chip_name = request.initial_info().chip().id();
77     auto manufacturer = request.initial_info().chip().manufacturer();
78     auto product_name = request.initial_info().chip().product_name();
79     auto chip_address = request.initial_info().chip().address();
80     auto bt_properties = request.initial_info().chip().bt_properties();
81     // Add a new chip to the device
82     std::string chip_kind_string;
83     switch (chip_kind) {
84       case common::ChipKind::BLUETOOTH:
85         chip_kind_string = "BLUETOOTH";
86         break;
87       case common::ChipKind::WIFI:
88         chip_kind_string = "WIFI";
89         break;
90       case common::ChipKind::UWB:
91         chip_kind_string = "UWB";
92         break;
93       default:
94         chip_kind_string = "UNSPECIFIED";
95         break;
96     }
97 
98     std::vector<unsigned char> message_vec(bt_properties.ByteSizeLong());
99     if (!bt_properties.SerializeToArray(message_vec.data(),
100                                         message_vec.size())) {
101       BtsLogError("Failed to serialize bt_properties to bytes");
102     }
103 
104     auto result = netsim::device::AddChipCxx(
105         peer, device_name, chip_kind_string, chip_address, chip_name,
106         manufacturer, product_name, message_vec);
107     if (result->IsError()) {
108       return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
109                             "AddChipCxx failed to add chip into netsim");
110     }
111     uint32_t device_id = result->GetDeviceId();
112     uint32_t chip_id = result->GetChipId();
113 
114     BtsLogInfo(
115         "grpc_server: adding chip - chip_id: %d, "
116         "device_name: "
117         "%s",
118         chip_id, device_name.c_str());
119     // connect packet responses from chip facade to the peer
120     chip_id_to_stream[chip_id] = stream;
121     netsim::transport::RegisterGrpcTransport(chip_id);
122     this->ProcessRequests(stream, chip_id, chip_kind);
123 
124     // no longer able to send responses to peer
125     netsim::transport::UnregisterGrpcTransport(chip_id);
126     chip_id_to_stream.erase(chip_id);
127 
128     // Remove the chip from the device
129     netsim::device::RemoveChipCxx(device_id, chip_id);
130 
131     BtsLogInfo(
132         "grpc_server: removing chip - chip_id: %d, "
133         "device_name: "
134         "%s",
135         chip_id, device_name.c_str());
136 
137     return ::grpc::Status::OK;
138   }
139 
140   // Convert a protobuf bytes field into shared_ptr<<vec<uint8_t>>.
141   //
142   // Release ownership of the bytes field and convert it to a vector using move
143   // iterators. No copy when called with a mutable reference.
ToSharedVec(std::string * bytes_field)144   std::shared_ptr<std::vector<uint8_t>> ToSharedVec(std::string *bytes_field) {
145     return std::make_shared<std::vector<uint8_t>>(
146         std::make_move_iterator(bytes_field->begin()),
147         std::make_move_iterator(bytes_field->end()));
148   }
149 
150   // Process requests in a loop forwarding packets to the packet_hub and
151   // returning when the channel is closed.
ProcessRequests(Stream * stream,uint32_t chip_id,common::ChipKind chip_kind)152   void ProcessRequests(Stream *stream, uint32_t chip_id,
153                        common::ChipKind chip_kind) {
154     packet::PacketRequest request;
155     while (true) {
156       if (!stream->Read(&request)) {
157         BtsLogWarn("grpc_server: reading stopped - chip_id: %d", chip_id);
158         break;
159       }
160       // All kinds possible (bt, uwb, wifi), but each rpc only streames one.
161       if (chip_kind == common::ChipKind::BLUETOOTH) {
162         if (!request.has_hci_packet()) {
163           BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
164                      chip_id);
165           continue;
166         }
167         auto packet_type = request.hci_packet().packet_type();
168         auto packet =
169             ToSharedVec(request.mutable_hci_packet()->mutable_packet());
170         wireless::HandleRequestCxx(chip_id, *packet, packet_type);
171       } else if (chip_kind == common::ChipKind::WIFI) {
172         if (!request.has_packet()) {
173           BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
174                      chip_id);
175           continue;
176         }
177         auto packet = ToSharedVec(request.mutable_packet());
178         {
179           std::lock_guard<std::mutex> guard(gSlirpMutex);
180           wireless::HandleRequestCxx(chip_id, *packet,
181                                      packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
182         }
183       } else if (chip_kind == common::ChipKind::UWB) {
184         if (!request.has_packet()) {
185           BtsLogWarn("grpc_server: unknown packet from chip_id: %d", chip_id);
186           continue;
187         }
188         auto packet = ToSharedVec(request.mutable_packet());
189         wireless::HandleRequestCxx(chip_id, *packet,
190                                    packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
191 
192       } else {
193         BtsLogWarn("grpc_server: unknown chip_kind");
194       }
195     }
196   }
197 };
198 }  // namespace
199 
200 // handle_response is called by packet_hub to forward a response to the gRPC
201 // stream associated with chip_id.
202 //
203 // When writing, the packet is copied because is borrowed from a shared_ptr and
204 // grpc++ doesn't know about smart pointers.
HandleResponse(uint32_t chip_id,const std::vector<uint8_t> & packet,packet::HCIPacket_PacketType packet_type)205 void HandleResponse(uint32_t chip_id, const std::vector<uint8_t> &packet,
206                     packet::HCIPacket_PacketType packet_type) {
207   auto stream = chip_id_to_stream[chip_id];
208   if (stream) {
209     // TODO: lock or caller here because gRPC does not allow overlapping writes.
210     packet::PacketResponse response;
211     // Copies the borrowed packet for output
212     auto str_packet = std::string(packet.begin(), packet.end());
213     if (packet_type != packet::HCIPacket_PacketType_HCI_PACKET_UNSPECIFIED) {
214       response.mutable_hci_packet()->set_packet_type(packet_type);
215       response.mutable_hci_packet()->set_packet(str_packet);
216     } else {
217       response.set_packet(str_packet);
218     }
219     if (!stream->Write(response)) {
220       BtsLogWarn("grpc_server: write failed for chip_id: %d", chip_id);
221     }
222   } else {
223     BtsLogWarn("grpc_server: no stream for chip_id: %d", chip_id);
224   }
225 }
226 
227 // for cxx
HandleResponseCxx(uint32_t chip_id,const rust::Vec<rust::u8> & packet,uint8_t packet_type)228 void HandleResponseCxx(uint32_t chip_id, const rust::Vec<rust::u8> &packet,
229                        /* optional */ uint8_t packet_type) {
230   std::vector<uint8_t> vec(packet.begin(), packet.end());
231   HandleResponse(chip_id, vec, packet::HCIPacket_PacketType(packet_type));
232 }
233 
234 }  // namespace backend
235 
GetBackendService()236 std::unique_ptr<packet::PacketStreamer::Service> GetBackendService() {
237   return std::make_unique<backend::ServiceImpl>();
238 }
239 }  // namespace netsim
240