1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "GrpcVehicleServer.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <shared_mutex>
21
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24
25 #include "GarageModeServerSideHandler.h"
26 #include "PowerStateListener.h"
27 #include "VehicleServer.grpc.pb.h"
28 #include "VehicleServer.pb.h"
29 #include "vhal_v2_0/DefaultConfig.h"
30 #include "vhal_v2_0/ProtoMessageConverter.h"
31
32 namespace android {
33 namespace hardware {
34 namespace automotive {
35 namespace vehicle {
36 namespace V2_0 {
37
38 namespace impl {
39
40 class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
41 public:
GrpcVehicleServerImpl(const VirtualizedVhalServerInfo & serverInfo)42 explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
43 : mServiceAddr(serverInfo.getServerUri()),
44 mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
45 serverInfo.powerStateMarkerFilePath)),
46 mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
47 setValuePool(&mValueObjectPool);
48 }
49
50 // method from GrpcVehicleServer
51 GrpcVehicleServer& Start() override;
52
53 void Wait() override;
54
55 GrpcVehicleServer& Stop() override;
56
57 uint32_t NumOfActivePropertyValueStream() override;
58
59 // methods from IVehicleServer
60 void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
61
62 StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
63
64 // methods from vhal_proto::VehicleServer::Service
65
66 ::grpc::Status GetAllPropertyConfig(
67 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
68 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
69
70 ::grpc::Status SetProperty(::grpc::ServerContext* context,
71 const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
72 vhal_proto::VehicleHalCallStatus* status) override;
73
74 ::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context,
75 const ::google::protobuf::Empty*,
76 ::google::protobuf::Empty*) override;
77
78 ::grpc::Status StartPropertyValuesStream(
79 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
80 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
81
82 private:
83 // We keep long-lasting connection for streaming the prop values.
84 // For us, each connection can be represented as a function to send the new value, and
85 // an ID to identify this connection
86 struct ConnectionDescriptor {
87 using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
88
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor89 explicit ConnectionDescriptor(ValueWriterType&& value_writer)
90 : mValueWriter(std::move(value_writer)),
91 mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
92
93 ConnectionDescriptor(const ConnectionDescriptor&) = delete;
94
95 ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
96
97 // This move constructor is NOT THREAD-SAFE, which means it cannot be moved
98 // while using. Since the connection descriptors are pretected by mConnectionMutex
99 // then we are fine here
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor100 ConnectionDescriptor(ConnectionDescriptor&& cd)
101 : mValueWriter(std::move(cd.mValueWriter)),
102 mConnectionID(cd.mConnectionID),
103 mIsAlive(cd.mIsAlive.load()) {
104 cd.mIsAlive.store(false);
105 }
106
107 ValueWriterType mValueWriter;
108 uint64_t mConnectionID;
109 std::atomic<bool> mIsAlive{true};
110
111 static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
112 };
113
114 std::string mServiceAddr;
115 std::unique_ptr<::grpc::Server> mServer{nullptr};
116 VehiclePropValuePool mValueObjectPool;
117 std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
118 PowerStateListener mPowerStateListener;
119 std::thread mPowerStateListenerThread{};
120 mutable std::shared_mutex mConnectionMutex;
121 mutable std::shared_mutex mWriterMutex;
122 std::list<ConnectionDescriptor> mValueStreamingConnections;
123 };
124
125 std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
126
getServerCredentials()127 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
128 // TODO(chenhaosjtuacm): get secured credentials here
129 return ::grpc::InsecureServerCredentials();
130 }
131
makeGrpcVehicleServer(const VirtualizedVhalServerInfo & serverInfo)132 GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
133 return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
134 }
135
Start()136 GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
137 if (mServer) {
138 LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
139 return *this;
140 }
141
142 ::grpc::ServerBuilder builder;
143 builder.RegisterService(this);
144 builder.AddListeningPort(mServiceAddr, getServerCredentials());
145 mServer = builder.BuildAndStart();
146
147 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
148 << "please make sure the configuration and permissions are correct";
149
150 mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
151 return *this;
152 }
153
Wait()154 void GrpcVehicleServerImpl::Wait() {
155 if (mServer) {
156 mServer->Wait();
157 }
158
159 if (mPowerStateListenerThread.joinable()) {
160 mPowerStateListenerThread.join();
161 }
162
163 mPowerStateListenerThread = {};
164 mServer.reset();
165 }
166
Stop()167 GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
168 if (!mServer) {
169 LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
170 return *this;
171 }
172
173 mServer->Shutdown();
174 mPowerStateListener.Stop();
175 return *this;
176 }
177
NumOfActivePropertyValueStream()178 uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
179 std::shared_lock read_lock(mConnectionMutex);
180 return mValueStreamingConnections.size();
181 }
182
onPropertyValueFromCar(const VehiclePropValue & value,bool updateStatus)183 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
184 bool updateStatus) {
185 vhal_proto::WrappedVehiclePropValue wrappedPropValue;
186 proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
187 wrappedPropValue.set_update_status(updateStatus);
188 std::shared_lock read_lock(mConnectionMutex);
189
190 bool has_terminated_connections = 0;
191
192 for (auto& connection : mValueStreamingConnections) {
193 auto writeOK = connection.mValueWriter(wrappedPropValue);
194 if (!writeOK) {
195 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
196 << connection.mConnectionID;
197 has_terminated_connections = true;
198 connection.mIsAlive.store(false);
199 }
200 }
201
202 if (!has_terminated_connections) {
203 return;
204 }
205
206 read_lock.unlock();
207
208 std::unique_lock write_lock(mConnectionMutex);
209
210 for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
211 if (!itr->mIsAlive.load()) {
212 itr = mValueStreamingConnections.erase(itr);
213 } else {
214 ++itr;
215 }
216 }
217 }
218
onSetProperty(const VehiclePropValue & value,bool updateStatus)219 StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
220 if (value.prop == AP_POWER_STATE_REPORT &&
221 value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
222 mGarageModeHandler->HandleHeartbeat();
223 }
224 return GrpcVehicleServer::onSetProperty(value, updateStatus);
225 }
226
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::VehiclePropConfig> * stream)227 ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
228 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
229 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
230 auto configs = onGetAllPropertyConfig();
231 for (auto& config : configs) {
232 vhal_proto::VehiclePropConfig protoConfig;
233 proto_msg_converter::toProto(&protoConfig, config);
234 if (!stream->Write(protoConfig)) {
235 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
236 }
237 }
238
239 return ::grpc::Status::OK;
240 }
241
SetProperty(::grpc::ServerContext * context,const vhal_proto::WrappedVehiclePropValue * wrappedPropValue,vhal_proto::VehicleHalCallStatus * status)242 ::grpc::Status GrpcVehicleServerImpl::SetProperty(
243 ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
244 vhal_proto::VehicleHalCallStatus* status) {
245 VehiclePropValue value;
246 proto_msg_converter::fromProto(&value, wrappedPropValue->value());
247
248 auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
249 if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
250 return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
251 }
252
253 status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
254
255 return ::grpc::Status::OK;
256 }
257
SendAllPropertyValuesToStream(::grpc::ServerContext * context,const::google::protobuf::Empty *,::google::protobuf::Empty *)258 ::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream(
259 ::grpc::ServerContext* context, const ::google::protobuf::Empty*,
260 ::google::protobuf::Empty*) {
261 sendAllValuesToClient();
262 return ::grpc::Status::OK;
263 }
264
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue> * stream)265 ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
266 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
267 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
268 std::mutex terminateMutex;
269 std::condition_variable terminateCV;
270 std::unique_lock<std::mutex> terminateLock(terminateMutex);
271 bool terminated{false};
272
273 auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
274 this](const vhal_proto::WrappedVehiclePropValue& value) {
275 std::unique_lock lock(mWriterMutex);
276 if (!stream->Write(value)) {
277 std::unique_lock<std::mutex> terminateLock(terminateMutex);
278 terminated = true;
279 terminateLock.unlock();
280 terminateCV.notify_all();
281 return false;
282 }
283 return true;
284 };
285
286 // Register connection
287 std::unique_lock lock(mConnectionMutex);
288 auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
289 lock.unlock();
290
291 // Never stop until connection lost
292 terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
293
294 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
295
296 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
297 }
298
299 } // namespace impl
300
301 } // namespace V2_0
302 } // namespace vehicle
303 } // namespace automotive
304 } // namespace hardware
305 } // namespace android
306