1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "GRPCVehicleProxyServer.h"
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <grpc++/grpc++.h>
22 
23 #include <android-base/logging.h>
24 
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 namespace android::hardware::automotive::vehicle::virtualization {
33 
34 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35 
getServerCredentials()36 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37     // TODO(chenhaosjtuacm): get secured credentials here
38     return ::grpc::InsecureServerCredentials();
39 }
40 
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)41 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42                                                std::unique_ptr<IVehicleHardware>&& hardware)
43     : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) {
44     mHardware->registerOnPropertyChangeEvent(
45             std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
46                     [this](std::vector<aidlvhal::VehiclePropValue> values) {
47                         OnVehiclePropChange(values);
48                     }));
49 }
50 
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)51 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
52         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
53         ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
54     for (const auto& config : mHardware->getAllPropertyConfigs()) {
55         proto::VehiclePropConfig protoConfig;
56         proto_msg_converter::aidlToProto(config, &protoConfig);
57         if (!stream->Write(protoConfig)) {
58             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
59         }
60     }
61     return ::grpc::Status::OK;
62 }
63 
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)64 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
65                                                  const proto::VehiclePropValueRequests* requests,
66                                                  proto::SetValueResults* results) {
67     std::vector<aidlvhal::SetValueRequest> aidlRequests;
68     for (const auto& protoRequest : requests->requests()) {
69         auto& aidlRequest = aidlRequests.emplace_back();
70         aidlRequest.requestId = protoRequest.request_id();
71         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
72     }
73     auto waitMtx = std::make_shared<std::mutex>();
74     auto waitCV = std::make_shared<std::condition_variable>();
75     auto complete = std::make_shared<bool>(false);
76     auto tmpResults = std::make_shared<proto::SetValueResults>();
77     auto aidlStatus = mHardware->setValues(
78             std::make_shared<const IVehicleHardware::SetValuesCallback>(
79                     [waitMtx, waitCV, complete,
80                      tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
81                         for (const auto& aidlResult : setValueResults) {
82                             auto& protoResult = *tmpResults->add_results();
83                             protoResult.set_request_id(aidlResult.requestId);
84                             protoResult.set_status(
85                                     static_cast<proto::StatusCode>(aidlResult.status));
86                         }
87                         {
88                             std::lock_guard lck(*waitMtx);
89                             *complete = true;
90                         }
91                         waitCV->notify_all();
92                     }),
93             aidlRequests);
94     if (aidlStatus != aidlvhal::StatusCode::OK) {
95         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
96                               "The underlying hardware fails to set values, VHAL status: " +
97                                       toString(aidlStatus));
98     }
99     std::unique_lock lck(*waitMtx);
100     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
101     if (!success) {
102         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
103                               "The underlying hardware set values timeout.");
104     }
105     *results = std::move(*tmpResults);
106     return ::grpc::Status::OK;
107 }
108 
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)109 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
110                                                  const proto::VehiclePropValueRequests* requests,
111                                                  proto::GetValueResults* results) {
112     std::vector<aidlvhal::GetValueRequest> aidlRequests;
113     for (const auto& protoRequest : requests->requests()) {
114         auto& aidlRequest = aidlRequests.emplace_back();
115         aidlRequest.requestId = protoRequest.request_id();
116         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
117     }
118     auto waitMtx = std::make_shared<std::mutex>();
119     auto waitCV = std::make_shared<std::condition_variable>();
120     auto complete = std::make_shared<bool>(false);
121     auto tmpResults = std::make_shared<proto::GetValueResults>();
122     auto aidlStatus = mHardware->getValues(
123             std::make_shared<const IVehicleHardware::GetValuesCallback>(
124                     [waitMtx, waitCV, complete,
125                      tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
126                         for (const auto& aidlResult : getValueResults) {
127                             auto& protoResult = *tmpResults->add_results();
128                             protoResult.set_request_id(aidlResult.requestId);
129                             protoResult.set_status(
130                                     static_cast<proto::StatusCode>(aidlResult.status));
131                             if (aidlResult.prop) {
132                                 auto* valuePtr = protoResult.mutable_value();
133                                 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
134                             }
135                         }
136                         {
137                             std::lock_guard lck(*waitMtx);
138                             *complete = true;
139                         }
140                         waitCV->notify_all();
141                     }),
142             aidlRequests);
143     if (aidlStatus != aidlvhal::StatusCode::OK) {
144         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
145                               "The underlying hardware fails to get values, VHAL status: " +
146                                       toString(aidlStatus));
147     }
148     std::unique_lock lck(*waitMtx);
149     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
150     if (!success) {
151         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
152                               "The underlying hardware get values timeout.");
153     }
154     *results = std::move(*tmpResults);
155     return ::grpc::Status::OK;
156 }
157 
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)158 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
159         ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
160         proto::VehicleHalCallStatus* status) {
161     const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
162                                                          request->sample_rate());
163     status->set_status_code(static_cast<proto::StatusCode>(status_code));
164     return ::grpc::Status::OK;
165 }
166 
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)167 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
168                                                  const proto::SubscribeRequest* request,
169                                                  proto::VehicleHalCallStatus* status) {
170     const auto& protoSubscribeOptions = request->options();
171     aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
172     proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
173     const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
174     status->set_status_code(static_cast<proto::StatusCode>(status_code));
175     return ::grpc::Status::OK;
176 }
177 
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)178 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
179                                                    const proto::UnsubscribeRequest* request,
180                                                    proto::VehicleHalCallStatus* status) {
181     int32_t propId = request->prop_id();
182     int32_t areaId = request->area_id();
183     const auto status_code = mHardware->unsubscribe(propId, areaId);
184     status->set_status_code(static_cast<proto::StatusCode>(status_code));
185     return ::grpc::Status::OK;
186 }
187 
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)188 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
189                                                    const ::google::protobuf::Empty*,
190                                                    proto::VehicleHalCallStatus* status) {
191     status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
192     return ::grpc::Status::OK;
193 }
194 
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)195 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
196                                             const proto::DumpOptions* options,
197                                             proto::DumpResult* result) {
198     std::vector<std::string> dumpOptionStrings(options->options().begin(),
199                                                options->options().end());
200     auto dumpResult = mHardware->dump(dumpOptionStrings);
201     result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
202     result->set_buffer(dumpResult.buffer);
203     return ::grpc::Status::OK;
204 }
205 
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)206 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
207         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
208         ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
209     auto conn = std::make_shared<ConnectionDescriptor>(stream);
210     {
211         std::lock_guard lck(mConnectionMutex);
212         mValueStreamingConnections.push_back(conn);
213     }
214     conn->Wait();
215     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
216     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
217 }
218 
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)219 void GrpcVehicleProxyServer::OnVehiclePropChange(
220         const std::vector<aidlvhal::VehiclePropValue>& values) {
221     std::unordered_set<uint64_t> brokenConn;
222     proto::VehiclePropValues protoValues;
223     for (const auto& value : values) {
224         auto* protoValuePtr = protoValues.add_values();
225         proto_msg_converter::aidlToProto(value, protoValuePtr);
226     }
227     {
228         std::shared_lock read_lock(mConnectionMutex);
229         for (auto& connection : mValueStreamingConnections) {
230             auto writeOK = connection->Write(protoValues);
231             if (!writeOK) {
232                 LOG(ERROR) << __func__
233                            << ": Server Write failed, connection lost. ID: " << connection->ID();
234                 brokenConn.insert(connection->ID());
235             }
236         }
237     }
238     if (brokenConn.empty()) {
239         return;
240     }
241     std::unique_lock write_lock(mConnectionMutex);
242     mValueStreamingConnections.erase(
243             std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
244                            [&brokenConn](const auto& conn) {
245                                return brokenConn.find(conn->ID()) != brokenConn.end();
246                            }),
247             mValueStreamingConnections.end());
248 }
249 
Start()250 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
251     if (mServer) {
252         LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
253         return *this;
254     }
255     ::grpc::ServerBuilder builder;
256     builder.RegisterService(this);
257     builder.AddListeningPort(mServiceAddr, getServerCredentials());
258     mServer = builder.BuildAndStart();
259     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
260                    << "please make sure the configuration and permissions are correct";
261     return *this;
262 }
263 
Shutdown()264 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
265     std::shared_lock read_lock(mConnectionMutex);
266     for (auto& conn : mValueStreamingConnections) {
267         conn->Shutdown();
268     }
269     if (mServer) {
270         mServer->Shutdown();
271     }
272     return *this;
273 }
274 
Wait()275 void GrpcVehicleProxyServer::Wait() {
276     if (mServer) {
277         mServer->Wait();
278     }
279     mServer.reset();
280 }
281 
~ConnectionDescriptor()282 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
283     Shutdown();
284 }
285 
Write(const proto::VehiclePropValues & values)286 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
287     if (!mStream) {
288         LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
289         Shutdown();
290         return false;
291     }
292     {
293         std::lock_guard lck(*mMtx);
294         if (!mShutdownFlag && mStream->Write(values)) {
295             return true;
296         } else {
297             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
298         }
299     }
300     Shutdown();
301     return false;
302 }
303 
Wait()304 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
305     std::unique_lock lck(*mMtx);
306     mCV->wait(lck, [this] { return mShutdownFlag; });
307 }
308 
Shutdown()309 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
310     {
311         std::lock_guard lck(*mMtx);
312         mShutdownFlag = true;
313     }
314     mCV->notify_all();
315 }
316 
317 }  // namespace android::hardware::automotive::vehicle::virtualization
318