1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "rpc_world_manager.h"
18 
19 #include "chre/util/macros.h"
20 #include "chre/util/nanoapp/log.h"
21 #include "chre/util/time.h"
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "[RpcWorld]"
25 #endif  // LOG_TAG
26 
27 // [Server] Service implementations.
Increment(const chre_rpc_NumberMessage & request,chre_rpc_NumberMessage & response)28 pw::Status RpcWorldService::Increment(const chre_rpc_NumberMessage &request,
29                                       chre_rpc_NumberMessage &response) {
30   RpcWorldManagerSingleton::get()->setPermissionForNextMessage(
31       CHRE_MESSAGE_PERMISSION_NONE);
32   response.number = request.number + 1;
33   return pw::OkStatus();
34 }
35 
Timer(const chre_rpc_TimerRequest & request,RpcWorldService::ServerWriter<chre_rpc_TimerResponse> & writer)36 void RpcWorldService::Timer(
37     const chre_rpc_TimerRequest &request,
38     RpcWorldService::ServerWriter<chre_rpc_TimerResponse> &writer) {
39   RpcWorldManagerSingleton::get()->timerStart(request.num_ticks, writer);
40 }
41 
Add(RpcWorldService::ServerReader<chre_rpc_NumberMessage,chre_rpc_NumberMessage> & reader)42 void RpcWorldService::Add(
43     RpcWorldService::ServerReader<chre_rpc_NumberMessage,
44                                   chre_rpc_NumberMessage> &reader) {
45   RpcWorldManagerSingleton::get()->addStart(reader);
46 }
47 
48 // [Client] callbacks.
incrementResponse(const chre_rpc_NumberMessage & response,pw::Status status)49 void incrementResponse(const chre_rpc_NumberMessage &response,
50                        pw::Status status) {
51   if (status.ok()) {
52     LOGI("Increment response: %d", response.number);
53   } else {
54     LOGE("Increment failed with status %d", static_cast<int>(status.code()));
55   }
56 }
57 
timerResponse(const chre_rpc_TimerResponse & response)58 void timerResponse(const chre_rpc_TimerResponse &response) {
59   LOGI("Tick response: %d", response.tick_number);
60 }
61 
timerEnd(pw::Status status)62 void timerEnd(pw::Status status) {
63   LOGI("Tick stream end: %d", static_cast<int>(status.code()));
64 }
65 
addResponse(const chre_rpc_NumberMessage & response,pw::Status status)66 void addResponse(const chre_rpc_NumberMessage &response, pw::Status status) {
67   if (status.ok()) {
68     LOGI("Add response: %d", response.number);
69   } else {
70     LOGE("Add failed with status %d", static_cast<int>(status.code()));
71   }
72 }
73 
start()74 bool RpcWorldManager::start() {
75   chre::RpcServer::Service service = {.service = mRpcWorldService,
76                                       .id = 0xca8f7150a3f05847,
77                                       .version = 0x01020034};
78   if (!mServer.registerServices(1 /*numServices*/, &service)) {
79     LOGE("Error while registering the service");
80   }
81 
82   auto client =
83       mClient.get<chre::rpc::pw_rpc::nanopb::RpcWorldService::Client>();
84 
85   if (client.has_value()) {
86     // [Client] Invoking a unary RPC.
87     chre_rpc_NumberMessage incrementRequest;
88     incrementRequest.number = 101;
89     mIncrementCall = client->Increment(incrementRequest, incrementResponse);
90     CHRE_ASSERT(mIncrementCall.active());
91 
92     // [Client] Invoking a server streaming RPC.
93     chre_rpc_TimerRequest timerRequest;
94     timerRequest.num_ticks = 5;
95     mTimerCall = client->Timer(timerRequest, timerResponse, timerEnd);
96     CHRE_ASSERT(mTimerCall.active());
97 
98     // [Client] Invoking a client streaming RPC.
99     chre_rpc_NumberMessage addRequest;
100     addRequest.number = 1;
101     mAddCall = client->Add(addResponse);
102     CHRE_ASSERT(mAddCall.active());
103     mAddCall.Write(addRequest);
104     mAddCall.Write(addRequest);
105     mAddCall.Write(addRequest);
106     mAddCall.RequestCompletion();
107   } else {
108     LOGE("Error while retrieving the client");
109   }
110 
111   return true;
112 }
113 
setPermissionForNextMessage(uint32_t permission)114 void RpcWorldManager::setPermissionForNextMessage(uint32_t permission) {
115   mServer.setPermissionForNextMessage(permission);
116 }
117 
handleEvent(uint32_t senderInstanceId,uint16_t eventType,const void * eventData)118 void RpcWorldManager::handleEvent(uint32_t senderInstanceId, uint16_t eventType,
119                                   const void *eventData) {
120   if (!mServer.handleEvent(senderInstanceId, eventType, eventData)) {
121     LOGE("[Server] An RPC error occurred");
122   }
123 
124   if (!mClient.handleEvent(senderInstanceId, eventType, eventData)) {
125     LOGE("[Client] An RPC error occurred");
126   }
127 
128   switch (eventType) {
129     case CHRE_EVENT_TIMER:
130       // [Server] stream responses.
131       chre_rpc_TimerResponse response;
132       response.tick_number = mTimerCurrentTick;
133       setPermissionForNextMessage(CHRE_MESSAGE_PERMISSION_NONE);
134       mTimerWriter.Write(response);
135       if (mTimerCurrentTick == mTimerTotalTicks) {
136         setPermissionForNextMessage(CHRE_MESSAGE_PERMISSION_NONE);
137         mTimerWriter.Finish(pw::OkStatus());
138         if (chreTimerCancel(mTimerId)) {
139           mTimerId = CHRE_TIMER_INVALID;
140         } else {
141           LOGE("Error while cancelling the timer");
142         }
143       }
144       mTimerCurrentTick++;
145   }
146 }
147 
end()148 void RpcWorldManager::end() {
149   mServer.close();
150   mClient.close();
151   if (mTimerId != CHRE_TIMER_INVALID) {
152     chreTimerCancel(mTimerId);
153   }
154 }
155 
timerStart(uint32_t numTicks,RpcWorldService::ServerWriter<chre_rpc_TimerResponse> & writer)156 void RpcWorldManager::timerStart(
157     uint32_t numTicks,
158     RpcWorldService::ServerWriter<chre_rpc_TimerResponse> &writer) {
159   mTimerCurrentTick = 1;
160   mTimerTotalTicks = numTicks;
161   mTimerWriter = std::move(writer);
162   mTimerId = chreTimerSet(chre::kOneSecondInNanoseconds, nullptr /*cookie*/,
163                           false /*oneShot*/);
164 }
165 
addStart(RpcWorldService::ServerReader<chre_rpc_NumberMessage,chre_rpc_NumberMessage> & reader)166 void RpcWorldManager::addStart(
167     RpcWorldService::ServerReader<chre_rpc_NumberMessage,
168                                   chre_rpc_NumberMessage> &reader) {
169   mSum = 0;
170   reader.set_on_next([](const chre_rpc_NumberMessage &request) {
171     RpcWorldManagerSingleton::get()->mSum += request.number;
172   });
173   reader.set_on_completion_requested([]() {
174     chre_rpc_NumberMessage response;
175     response.number = RpcWorldManagerSingleton::get()->mSum;
176     RpcWorldManagerSingleton::get()->setPermissionForNextMessage(
177         CHRE_MESSAGE_PERMISSION_NONE);
178     RpcWorldManagerSingleton::get()->mAddReader.Finish(response);
179   });
180   mAddReader = std::move(reader);
181 }