1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "backend/grpc_client.h"
16
17 #include <google/protobuf/util/json_util.h>
18
19 #include <cstdint>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25
26 #include "grpcpp/channel.h"
27 #include "grpcpp/create_channel.h"
28 #include "grpcpp/security/credentials.h"
29 #include "grpcpp/server_context.h"
30 #include "netsim/packet_streamer.grpc.pb.h"
31 #include "netsim/packet_streamer.pb.h"
32 #include "rust/cxx.h"
33 #include "util/log.h"
34
35 // Backend Packet Streamer Client
36
37 namespace netsim {
38 namespace backend {
39 namespace client {
40
41 const std::chrono::duration kConnectionDeadline = std::chrono::seconds(5);
42
43 using Stream = ::grpc::ClientReaderWriter<netsim::packet::PacketRequest,
44 netsim::packet::PacketResponse>;
45
46 std::mutex mutex_;
47 uint32_t stream_id_max_ = 0;
48
49 // Active StreamPacket calls
50 std::unordered_map<uint32_t, std::unique_ptr<Stream>> streams_;
51 std::unordered_map<uint32_t, grpc::ClientContext> contexts_;
52
53 // Single connection to a server with multiple StreamPackets calls
54 std::string server_;
55 std::shared_ptr<grpc::Channel> channel_;
56 grpc::ClientContext context_;
57 std::unique_ptr<netsim::packet::PacketStreamer::Stub> stub_;
58
59 // Call the StreamPackets RPC on server.
60 //
61 // This function allows multiple StreamPacket calls at once but only one
62 // connection to a server. If the server isn't already connected a new
63 // connection is created.
64
StreamPackets(const rust::String & server_rust)65 uint32_t StreamPackets(const rust::String &server_rust) {
66 std::unique_lock<std::mutex> lock(mutex_);
67 auto server = std::string(server_rust);
68 if (server_.empty()) {
69 server_ = server;
70 channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
71 auto deadline = std::chrono::system_clock::now() + kConnectionDeadline;
72 if (!channel_->WaitForConnected(deadline)) {
73 BtsLog("Failed to create packet streamer client to %s", server_.c_str());
74 return -1;
75 }
76 stub_ = netsim::packet::PacketStreamer::NewStub(channel_);
77 } else if (server_ != server) {
78 BtsLog("grpc_client: multiple servers not supported");
79 return -1;
80 }
81 // Each active gRPC call needs its own context
82 auto stream = stub_->StreamPackets(&contexts_[++stream_id_max_]);
83 streams_[stream_id_max_] = std::move(stream);
84 BtsLog("Created packet streamer client to %s", server_.c_str());
85 return stream_id_max_;
86 }
87
88 /// Loop reading packets on the stream identified by stream_id and call the
89 // ReadCallback function with the PacketResponse byte proto.
90
ReadPacketResponseLoop(uint32_t stream_id,ReadCallback read_fn)91 bool ReadPacketResponseLoop(uint32_t stream_id, ReadCallback read_fn) {
92 netsim::packet::PacketResponse response;
93 while (true) {
94 {
95 std::unique_lock<std::mutex> lock(mutex_);
96 if (streams_.find(stream_id) == streams_.end()) {
97 BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
98 return false;
99 }
100 }
101 // TODO: fix locking here
102 if (!streams_[stream_id]->Read(&response)) {
103 BtsLogWarn("grpc_client: reading stopped stream_id %d", stream_id);
104 return false;
105 }
106 std::vector<unsigned char> proto_bytes(response.ByteSizeLong());
107 response.SerializeToArray(proto_bytes.data(), proto_bytes.size());
108 rust::Slice<const uint8_t> slice{proto_bytes.data(), proto_bytes.size()};
109 (*read_fn)(stream_id, slice);
110 }
111 }
112
113 // Write a packet to the stream identified by stream_id
114
WritePacketRequest(uint32_t stream_id,const rust::Slice<::std::uint8_t const> proto_bytes)115 bool WritePacketRequest(uint32_t stream_id,
116 const rust::Slice<::std::uint8_t const> proto_bytes) {
117 netsim::packet::PacketRequest request;
118 if (!request.ParseFromArray(proto_bytes.data(), proto_bytes.size())) {
119 BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
120 return false;
121 };
122
123 std::unique_lock<std::mutex> lock(mutex_);
124 if (streams_.find(stream_id) == streams_.end()) {
125 BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
126 return false;
127 }
128 if (!streams_[stream_id]->Write(request)) {
129 BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
130 return false;
131 }
132 return true;
133 };
134
135 } // namespace client
136 } // namespace backend
137 } // namespace netsim
138