/* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "ProtoMessageConverter.h" #include #include #include #include #include #include namespace android::hardware::automotive::vehicle::virtualization { static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureChannelCredentials(); } GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) : mServiceAddr(std::move(service_addr)), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)), mValuePollingThread([this] { ValuePollingLoop(); }) {} // Only used for unit testing. GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr stub) : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)), mValuePollingThread([] {}) {} GRPCVehicleHardware::~GRPCVehicleHardware() { { std::lock_guard lck(mShutdownMutex); mShuttingDownFlag.store(true); } mShutdownCV.notify_all(); mValuePollingThread.join(); } std::vector GRPCVehicleHardware::getAllPropertyConfigs() const { std::vector configs; ::grpc::ClientContext context; auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); proto::VehiclePropConfig protoConfig; while (config_stream->Read(&protoConfig)) { aidlvhal::VehiclePropConfig config; proto_msg_converter::protoToAidl(protoConfig, &config); configs.push_back(std::move(config)); } auto grpc_status = config_stream->Finish(); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); } return configs; } aidlvhal::StatusCode GRPCVehicleHardware::setValues( std::shared_ptr callback, const std::vector& requests) { ::grpc::ClientContext context; proto::VehiclePropValueRequests protoRequests; proto::SetValueResults protoResults; for (const auto& request : requests) { auto& protoRequest = *protoRequests.add_requests(); protoRequest.set_request_id(request.requestId); proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value()); } // TODO(chenhaosjtuacm): Make it Async. auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message(); { std::shared_lock lck(mCallbackMutex); // TODO(chenhaosjtuacm): call on-set-error callback. } return aidlvhal::StatusCode::INTERNAL_ERROR; } std::vector results; for (const auto& protoResult : protoResults.results()) { auto& result = results.emplace_back(); result.requestId = protoResult.request_id(); result.status = static_cast(protoResult.status()); // TODO(chenhaosjtuacm): call on-set-error callback. } (*callback)(std::move(results)); return aidlvhal::StatusCode::OK; } aidlvhal::StatusCode GRPCVehicleHardware::getValues( std::shared_ptr callback, const std::vector& requests) const { ::grpc::ClientContext context; proto::VehiclePropValueRequests protoRequests; proto::GetValueResults protoResults; for (const auto& request : requests) { auto& protoRequest = *protoRequests.add_requests(); protoRequest.set_request_id(request.requestId); proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value()); } // TODO(chenhaosjtuacm): Make it Async. auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } std::vector results; for (const auto& protoResult : protoResults.results()) { auto& result = results.emplace_back(); result.requestId = protoResult.request_id(); result.status = static_cast(protoResult.status()); if (protoResult.has_value()) { aidlvhal::VehiclePropValue value; proto_msg_converter::protoToAidl(protoResult.value(), &value); result.prop = std::move(value); } } (*callback)(std::move(results)); return aidlvhal::StatusCode::OK; } void GRPCVehicleHardware::registerOnPropertyChangeEvent( std::unique_ptr callback) { std::lock_guard lck(mCallbackMutex); if (mOnPropChange) { LOG(ERROR) << __func__ << " must only be called once."; return; } mOnPropChange = std::move(callback); } void GRPCVehicleHardware::registerOnPropertySetErrorEvent( std::unique_ptr callback) { std::lock_guard lck(mCallbackMutex); if (mOnSetErr) { LOG(ERROR) << __func__ << " must only be called once."; return; } mOnSetErr = std::move(callback); } DumpResult GRPCVehicleHardware::dump(const std::vector& options) { ::grpc::ClientContext context; proto::DumpOptions protoDumpOptions; proto::DumpResult protoDumpResult; for (const auto& option : options) { protoDumpOptions.add_options(option); } auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message(); return {}; } return { .callerShouldDumpState = protoDumpResult.caller_should_dump_state(), .buffer = protoDumpResult.buffer(), }; } aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() { ::grpc::ClientContext context; proto::VehicleHalCallStatus protoStatus; auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast(protoStatus.status_code()); } aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) { proto::SubscribeRequest request; ::grpc::ClientContext context; proto::VehicleHalCallStatus protoStatus; proto_msg_converter::aidlToProto(options, request.mutable_options()); auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus); if (!grpc_status.ok()) { if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { // This is a legacy sever. It should handle updateSampleRate. LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server"; return aidlvhal::StatusCode::OK; } LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast(protoStatus.status_code()); } aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) { proto::UnsubscribeRequest request; ::grpc::ClientContext context; proto::VehicleHalCallStatus protoStatus; request.set_prop_id(propId); request.set_area_id(areaId); auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus); if (!grpc_status.ok()) { if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { // This is a legacy sever. Ignore unsubscribe request. LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server"; return aidlvhal::StatusCode::OK; } LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast(protoStatus.status_code()); } aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId, float sampleRate) { ::grpc::ClientContext context; proto::UpdateSampleRateRequest request; proto::VehicleHalCallStatus protoStatus; request.set_prop(propId); request.set_area_id(areaId); request.set_sample_rate(sampleRate); auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast(protoStatus.status_code()); } bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { return mGrpcChannel->WaitForConnected(gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN))); } void GRPCVehicleHardware::ValuePollingLoop() { while (!mShuttingDownFlag.load()) { ::grpc::ClientContext context; bool rpc_stopped{false}; std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { std::unique_lock lck(mShutdownMutex); mShutdownCV.wait(lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); }); context.TryCancel(); }); auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; proto::VehiclePropValues protoValues; while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { std::vector values; for (const auto protoValue : protoValues.values()) { values.push_back(aidlvhal::VehiclePropValue()); proto_msg_converter::protoToAidl(protoValue, &values.back()); } std::shared_lock lck(mCallbackMutex); if (mOnPropChange) { (*mOnPropChange)(values); } } { std::lock_guard lck(mShutdownMutex); rpc_stopped = true; } mShutdownCV.notify_all(); shuttingdown_watcher.join(); auto grpc_status = value_stream->Finish(); // never reach here until connection lost LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); // try to reconnect } } } // namespace android::hardware::automotive::vehicle::virtualization