1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "rpc_world_manager.h"
18
19 #include "chre/util/macros.h"
20 #include "chre/util/nanoapp/log.h"
21 #include "chre/util/time.h"
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "[RpcWorld]"
25 #endif // LOG_TAG
26
27 // [Server] Service implementations.
Increment(const chre_rpc_NumberMessage & request,chre_rpc_NumberMessage & response)28 pw::Status RpcWorldService::Increment(const chre_rpc_NumberMessage &request,
29 chre_rpc_NumberMessage &response) {
30 RpcWorldManagerSingleton::get()->setPermissionForNextMessage(
31 CHRE_MESSAGE_PERMISSION_NONE);
32 response.number = request.number + 1;
33 return pw::OkStatus();
34 }
35
Timer(const chre_rpc_TimerRequest & request,RpcWorldService::ServerWriter<chre_rpc_TimerResponse> & writer)36 void RpcWorldService::Timer(
37 const chre_rpc_TimerRequest &request,
38 RpcWorldService::ServerWriter<chre_rpc_TimerResponse> &writer) {
39 RpcWorldManagerSingleton::get()->timerStart(request.num_ticks, writer);
40 }
41
Add(RpcWorldService::ServerReader<chre_rpc_NumberMessage,chre_rpc_NumberMessage> & reader)42 void RpcWorldService::Add(
43 RpcWorldService::ServerReader<chre_rpc_NumberMessage,
44 chre_rpc_NumberMessage> &reader) {
45 RpcWorldManagerSingleton::get()->addStart(reader);
46 }
47
48 // [Client] callbacks.
incrementResponse(const chre_rpc_NumberMessage & response,pw::Status status)49 void incrementResponse(const chre_rpc_NumberMessage &response,
50 pw::Status status) {
51 if (status.ok()) {
52 LOGI("Increment response: %d", response.number);
53 } else {
54 LOGE("Increment failed with status %d", static_cast<int>(status.code()));
55 }
56 }
57
timerResponse(const chre_rpc_TimerResponse & response)58 void timerResponse(const chre_rpc_TimerResponse &response) {
59 LOGI("Tick response: %d", response.tick_number);
60 }
61
timerEnd(pw::Status status)62 void timerEnd(pw::Status status) {
63 LOGI("Tick stream end: %d", static_cast<int>(status.code()));
64 }
65
addResponse(const chre_rpc_NumberMessage & response,pw::Status status)66 void addResponse(const chre_rpc_NumberMessage &response, pw::Status status) {
67 if (status.ok()) {
68 LOGI("Add response: %d", response.number);
69 } else {
70 LOGE("Add failed with status %d", static_cast<int>(status.code()));
71 }
72 }
73
start()74 bool RpcWorldManager::start() {
75 chre::RpcServer::Service service = {.service = mRpcWorldService,
76 .id = 0xca8f7150a3f05847,
77 .version = 0x01020034};
78 if (!mServer.registerServices(1 /*numServices*/, &service)) {
79 LOGE("Error while registering the service");
80 }
81
82 auto client =
83 mClient.get<chre::rpc::pw_rpc::nanopb::RpcWorldService::Client>();
84
85 if (client.has_value()) {
86 // [Client] Invoking a unary RPC.
87 chre_rpc_NumberMessage incrementRequest;
88 incrementRequest.number = 101;
89 mIncrementCall = client->Increment(incrementRequest, incrementResponse);
90 CHRE_ASSERT(mIncrementCall.active());
91
92 // [Client] Invoking a server streaming RPC.
93 chre_rpc_TimerRequest timerRequest;
94 timerRequest.num_ticks = 5;
95 mTimerCall = client->Timer(timerRequest, timerResponse, timerEnd);
96 CHRE_ASSERT(mTimerCall.active());
97
98 // [Client] Invoking a client streaming RPC.
99 chre_rpc_NumberMessage addRequest;
100 addRequest.number = 1;
101 mAddCall = client->Add(addResponse);
102 CHRE_ASSERT(mAddCall.active());
103 mAddCall.Write(addRequest);
104 mAddCall.Write(addRequest);
105 mAddCall.Write(addRequest);
106 mAddCall.RequestCompletion();
107 } else {
108 LOGE("Error while retrieving the client");
109 }
110
111 return true;
112 }
113
setPermissionForNextMessage(uint32_t permission)114 void RpcWorldManager::setPermissionForNextMessage(uint32_t permission) {
115 mServer.setPermissionForNextMessage(permission);
116 }
117
handleEvent(uint32_t senderInstanceId,uint16_t eventType,const void * eventData)118 void RpcWorldManager::handleEvent(uint32_t senderInstanceId, uint16_t eventType,
119 const void *eventData) {
120 if (!mServer.handleEvent(senderInstanceId, eventType, eventData)) {
121 LOGE("[Server] An RPC error occurred");
122 }
123
124 if (!mClient.handleEvent(senderInstanceId, eventType, eventData)) {
125 LOGE("[Client] An RPC error occurred");
126 }
127
128 switch (eventType) {
129 case CHRE_EVENT_TIMER:
130 // [Server] stream responses.
131 chre_rpc_TimerResponse response;
132 response.tick_number = mTimerCurrentTick;
133 setPermissionForNextMessage(CHRE_MESSAGE_PERMISSION_NONE);
134 mTimerWriter.Write(response);
135 if (mTimerCurrentTick == mTimerTotalTicks) {
136 setPermissionForNextMessage(CHRE_MESSAGE_PERMISSION_NONE);
137 mTimerWriter.Finish(pw::OkStatus());
138 if (chreTimerCancel(mTimerId)) {
139 mTimerId = CHRE_TIMER_INVALID;
140 } else {
141 LOGE("Error while cancelling the timer");
142 }
143 }
144 mTimerCurrentTick++;
145 }
146 }
147
end()148 void RpcWorldManager::end() {
149 mServer.close();
150 mClient.close();
151 if (mTimerId != CHRE_TIMER_INVALID) {
152 chreTimerCancel(mTimerId);
153 }
154 }
155
timerStart(uint32_t numTicks,RpcWorldService::ServerWriter<chre_rpc_TimerResponse> & writer)156 void RpcWorldManager::timerStart(
157 uint32_t numTicks,
158 RpcWorldService::ServerWriter<chre_rpc_TimerResponse> &writer) {
159 mTimerCurrentTick = 1;
160 mTimerTotalTicks = numTicks;
161 mTimerWriter = std::move(writer);
162 mTimerId = chreTimerSet(chre::kOneSecondInNanoseconds, nullptr /*cookie*/,
163 false /*oneShot*/);
164 }
165
addStart(RpcWorldService::ServerReader<chre_rpc_NumberMessage,chre_rpc_NumberMessage> & reader)166 void RpcWorldManager::addStart(
167 RpcWorldService::ServerReader<chre_rpc_NumberMessage,
168 chre_rpc_NumberMessage> &reader) {
169 mSum = 0;
170 reader.set_on_next([](const chre_rpc_NumberMessage &request) {
171 RpcWorldManagerSingleton::get()->mSum += request.number;
172 });
173 reader.set_on_completion_requested([]() {
174 chre_rpc_NumberMessage response;
175 response.number = RpcWorldManagerSingleton::get()->mSum;
176 RpcWorldManagerSingleton::get()->setPermissionForNextMessage(
177 CHRE_MESSAGE_PERMISSION_NONE);
178 RpcWorldManagerSingleton::get()->mAddReader.Finish(response);
179 });
180 mAddReader = std::move(reader);
181 }