1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "backend/grpc_client.h"
16 
17 #include <google/protobuf/util/json_util.h>
18 
19 #include <cstdint>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25 
26 #include "grpcpp/channel.h"
27 #include "grpcpp/create_channel.h"
28 #include "grpcpp/security/credentials.h"
29 #include "grpcpp/server_context.h"
30 #include "netsim/packet_streamer.grpc.pb.h"
31 #include "netsim/packet_streamer.pb.h"
32 #include "rust/cxx.h"
33 #include "util/log.h"
34 
35 // Backend Packet Streamer Client
36 
37 namespace netsim {
38 namespace backend {
39 namespace client {
40 
41 const std::chrono::duration kConnectionDeadline = std::chrono::seconds(5);
42 
43 using Stream = ::grpc::ClientReaderWriter<netsim::packet::PacketRequest,
44                                           netsim::packet::PacketResponse>;
45 
46 std::mutex mutex_;
47 uint32_t stream_id_max_ = 0;
48 
49 // Active StreamPacket calls
50 std::unordered_map<uint32_t, std::unique_ptr<Stream>> streams_;
51 std::unordered_map<uint32_t, grpc::ClientContext> contexts_;
52 
53 // Single connection to a server with multiple StreamPackets calls
54 std::string server_;
55 std::shared_ptr<grpc::Channel> channel_;
56 grpc::ClientContext context_;
57 std::unique_ptr<netsim::packet::PacketStreamer::Stub> stub_;
58 
59 // Call the StreamPackets RPC on server.
60 //
61 // This function allows multiple StreamPacket calls at once but only one
62 // connection to a server. If the server isn't already connected a new
63 // connection is created.
64 
StreamPackets(const rust::String & server_rust)65 uint32_t StreamPackets(const rust::String &server_rust) {
66   std::unique_lock<std::mutex> lock(mutex_);
67   auto server = std::string(server_rust);
68   if (server_.empty()) {
69     server_ = server;
70     channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
71     auto deadline = std::chrono::system_clock::now() + kConnectionDeadline;
72     if (!channel_->WaitForConnected(deadline)) {
73       BtsLog("Failed to create packet streamer client to %s", server_.c_str());
74       return -1;
75     }
76     stub_ = netsim::packet::PacketStreamer::NewStub(channel_);
77   } else if (server_ != server) {
78     BtsLog("grpc_client: multiple servers not supported");
79     return -1;
80   }
81   // Each active gRPC call needs its own context
82   auto stream = stub_->StreamPackets(&contexts_[++stream_id_max_]);
83   streams_[stream_id_max_] = std::move(stream);
84   BtsLog("Created packet streamer client to %s", server_.c_str());
85   return stream_id_max_;
86 }
87 
88 /// Loop reading packets on the stream identified by stream_id and call the
89 //  ReadCallback function with the PacketResponse byte proto.
90 
ReadPacketResponseLoop(uint32_t stream_id,ReadCallback read_fn)91 bool ReadPacketResponseLoop(uint32_t stream_id, ReadCallback read_fn) {
92   netsim::packet::PacketResponse response;
93   while (true) {
94     {
95       std::unique_lock<std::mutex> lock(mutex_);
96       if (streams_.find(stream_id) == streams_.end()) {
97         BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
98         return false;
99       }
100     }
101     // TODO: fix locking here
102     if (!streams_[stream_id]->Read(&response)) {
103       BtsLogWarn("grpc_client: reading stopped stream_id %d", stream_id);
104       return false;
105     }
106     std::vector<unsigned char> proto_bytes(response.ByteSizeLong());
107     response.SerializeToArray(proto_bytes.data(), proto_bytes.size());
108     rust::Slice<const uint8_t> slice{proto_bytes.data(), proto_bytes.size()};
109     (*read_fn)(stream_id, slice);
110   }
111 }
112 
113 // Write a packet to the stream identified by stream_id
114 
WritePacketRequest(uint32_t stream_id,const rust::Slice<::std::uint8_t const> proto_bytes)115 bool WritePacketRequest(uint32_t stream_id,
116                         const rust::Slice<::std::uint8_t const> proto_bytes) {
117   netsim::packet::PacketRequest request;
118   if (!request.ParseFromArray(proto_bytes.data(), proto_bytes.size())) {
119     BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
120     return false;
121   };
122 
123   std::unique_lock<std::mutex> lock(mutex_);
124   if (streams_.find(stream_id) == streams_.end()) {
125     BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
126     return false;
127   }
128   if (!streams_[stream_id]->Write(request)) {
129     BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
130     return false;
131   }
132   return true;
133 };
134 
135 }  // namespace client
136 }  // namespace backend
137 }  // namespace netsim
138