1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "backend/grpc_server.h"
16
17 #include <google/protobuf/util/json_util.h>
18 #include <stdlib.h>
19
20 #include <cstdint>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25
26 #include "google/protobuf/empty.pb.h"
27 #include "grpcpp/server_context.h"
28 #include "grpcpp/support/status.h"
29 #include "netsim-daemon/src/ffi.rs.h"
30 #include "netsim/common.pb.h"
31 #include "netsim/packet_streamer.grpc.pb.h"
32 #include "netsim/packet_streamer.pb.h"
33 #include "util/log.h"
34
35 namespace netsim {
36 namespace backend {
37 namespace {
38
39 using netsim::common::ChipKind;
40
41 using Stream =
42 ::grpc::ServerReaderWriter<packet::PacketResponse, packet::PacketRequest>;
43
44 using netsim::startup::Chip;
45
46 // Mapping from chip_id to streams.
47 std::unordered_map<uint32_t, Stream *> chip_id_to_stream;
48
49 // Libslirp is not thread safe. Use a lock to prevent concurrent access to
50 // libslirp.
51 std::mutex gSlirpMutex;
52
53 // Service handles the gRPC StreamPackets requests.
54
55 class ServiceImpl final : public packet::PacketStreamer::Service {
56 public:
StreamPackets(::grpc::ServerContext * context,Stream * stream)57 ::grpc::Status StreamPackets(::grpc::ServerContext *context,
58 Stream *stream) override {
59 // Now connected to a peer issuing a bi-directional streaming grpc
60 auto peer = context->peer();
61 BtsLogInfo("grpc_server new packet_stream for peer %s", peer.c_str());
62
63 packet::PacketRequest request;
64
65 // First packet must have initial_info describing the peer
66 bool success = stream->Read(&request);
67 if (!success || !request.has_initial_info()) {
68 BtsLogError("ServiceImpl no initial information or stream closed");
69 return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
70 "Missing initial_info in first packet.");
71 }
72
73 auto device_name = request.initial_info().name();
74 auto chip_kind = request.initial_info().chip().kind();
75 // multiple chips of the same chip_kind for a device have a name
76 auto chip_name = request.initial_info().chip().id();
77 auto manufacturer = request.initial_info().chip().manufacturer();
78 auto product_name = request.initial_info().chip().product_name();
79 auto chip_address = request.initial_info().chip().address();
80 auto bt_properties = request.initial_info().chip().bt_properties();
81 // Add a new chip to the device
82 std::string chip_kind_string;
83 switch (chip_kind) {
84 case common::ChipKind::BLUETOOTH:
85 chip_kind_string = "BLUETOOTH";
86 break;
87 case common::ChipKind::WIFI:
88 chip_kind_string = "WIFI";
89 break;
90 case common::ChipKind::UWB:
91 chip_kind_string = "UWB";
92 break;
93 default:
94 chip_kind_string = "UNSPECIFIED";
95 break;
96 }
97
98 std::vector<unsigned char> message_vec(bt_properties.ByteSizeLong());
99 if (!bt_properties.SerializeToArray(message_vec.data(),
100 message_vec.size())) {
101 BtsLogError("Failed to serialize bt_properties to bytes");
102 }
103
104 auto result = netsim::device::AddChipCxx(
105 peer, device_name, chip_kind_string, chip_address, chip_name,
106 manufacturer, product_name, message_vec);
107 if (result->IsError()) {
108 return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
109 "AddChipCxx failed to add chip into netsim");
110 }
111 uint32_t device_id = result->GetDeviceId();
112 uint32_t chip_id = result->GetChipId();
113
114 BtsLogInfo(
115 "grpc_server: adding chip - chip_id: %d, "
116 "device_name: "
117 "%s",
118 chip_id, device_name.c_str());
119 // connect packet responses from chip facade to the peer
120 chip_id_to_stream[chip_id] = stream;
121 netsim::transport::RegisterGrpcTransport(chip_id);
122 this->ProcessRequests(stream, chip_id, chip_kind);
123
124 // no longer able to send responses to peer
125 netsim::transport::UnregisterGrpcTransport(chip_id);
126 chip_id_to_stream.erase(chip_id);
127
128 // Remove the chip from the device
129 netsim::device::RemoveChipCxx(device_id, chip_id);
130
131 BtsLogInfo(
132 "grpc_server: removing chip - chip_id: %d, "
133 "device_name: "
134 "%s",
135 chip_id, device_name.c_str());
136
137 return ::grpc::Status::OK;
138 }
139
140 // Convert a protobuf bytes field into shared_ptr<<vec<uint8_t>>.
141 //
142 // Release ownership of the bytes field and convert it to a vector using move
143 // iterators. No copy when called with a mutable reference.
ToSharedVec(std::string * bytes_field)144 std::shared_ptr<std::vector<uint8_t>> ToSharedVec(std::string *bytes_field) {
145 return std::make_shared<std::vector<uint8_t>>(
146 std::make_move_iterator(bytes_field->begin()),
147 std::make_move_iterator(bytes_field->end()));
148 }
149
150 // Process requests in a loop forwarding packets to the packet_hub and
151 // returning when the channel is closed.
ProcessRequests(Stream * stream,uint32_t chip_id,common::ChipKind chip_kind)152 void ProcessRequests(Stream *stream, uint32_t chip_id,
153 common::ChipKind chip_kind) {
154 packet::PacketRequest request;
155 while (true) {
156 if (!stream->Read(&request)) {
157 BtsLogWarn("grpc_server: reading stopped - chip_id: %d", chip_id);
158 break;
159 }
160 // All kinds possible (bt, uwb, wifi), but each rpc only streames one.
161 if (chip_kind == common::ChipKind::BLUETOOTH) {
162 if (!request.has_hci_packet()) {
163 BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
164 chip_id);
165 continue;
166 }
167 auto packet_type = request.hci_packet().packet_type();
168 auto packet =
169 ToSharedVec(request.mutable_hci_packet()->mutable_packet());
170 wireless::HandleRequestCxx(chip_id, *packet, packet_type);
171 } else if (chip_kind == common::ChipKind::WIFI) {
172 if (!request.has_packet()) {
173 BtsLogWarn("grpc_server: unknown packet type from chip_id: %d",
174 chip_id);
175 continue;
176 }
177 auto packet = ToSharedVec(request.mutable_packet());
178 {
179 std::lock_guard<std::mutex> guard(gSlirpMutex);
180 wireless::HandleRequestCxx(chip_id, *packet,
181 packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
182 }
183 } else if (chip_kind == common::ChipKind::UWB) {
184 if (!request.has_packet()) {
185 BtsLogWarn("grpc_server: unknown packet from chip_id: %d", chip_id);
186 continue;
187 }
188 auto packet = ToSharedVec(request.mutable_packet());
189 wireless::HandleRequestCxx(chip_id, *packet,
190 packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
191
192 } else {
193 BtsLogWarn("grpc_server: unknown chip_kind");
194 }
195 }
196 }
197 };
198 } // namespace
199
200 // handle_response is called by packet_hub to forward a response to the gRPC
201 // stream associated with chip_id.
202 //
203 // When writing, the packet is copied because is borrowed from a shared_ptr and
204 // grpc++ doesn't know about smart pointers.
HandleResponse(uint32_t chip_id,const std::vector<uint8_t> & packet,packet::HCIPacket_PacketType packet_type)205 void HandleResponse(uint32_t chip_id, const std::vector<uint8_t> &packet,
206 packet::HCIPacket_PacketType packet_type) {
207 auto stream = chip_id_to_stream[chip_id];
208 if (stream) {
209 // TODO: lock or caller here because gRPC does not allow overlapping writes.
210 packet::PacketResponse response;
211 // Copies the borrowed packet for output
212 auto str_packet = std::string(packet.begin(), packet.end());
213 if (packet_type != packet::HCIPacket_PacketType_HCI_PACKET_UNSPECIFIED) {
214 response.mutable_hci_packet()->set_packet_type(packet_type);
215 response.mutable_hci_packet()->set_packet(str_packet);
216 } else {
217 response.set_packet(str_packet);
218 }
219 if (!stream->Write(response)) {
220 BtsLogWarn("grpc_server: write failed for chip_id: %d", chip_id);
221 }
222 } else {
223 BtsLogWarn("grpc_server: no stream for chip_id: %d", chip_id);
224 }
225 }
226
227 // for cxx
HandleResponseCxx(uint32_t chip_id,const rust::Vec<rust::u8> & packet,uint8_t packet_type)228 void HandleResponseCxx(uint32_t chip_id, const rust::Vec<rust::u8> &packet,
229 /* optional */ uint8_t packet_type) {
230 std::vector<uint8_t> vec(packet.begin(), packet.end());
231 HandleResponse(chip_id, vec, packet::HCIPacket_PacketType(packet_type));
232 }
233
234 } // namespace backend
235
GetBackendService()236 std::unique_ptr<packet::PacketStreamer::Service> GetBackendService() {
237 return std::make_unique<backend::ServiceImpl>();
238 }
239 } // namespace netsim
240