1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "stats_buffer_writer_queue.h"
18
19 #include <private/android_filesystem_config.h>
20 #include <unistd.h>
21
22 #include <chrono>
23 #include <queue>
24 #include <thread>
25
26 #include "stats_buffer_writer_impl.h"
27 #include "stats_buffer_writer_queue_impl.h"
28 #include "utils.h"
29
BufferWriterQueue()30 BufferWriterQueue::BufferWriterQueue() : mWorkThread(&BufferWriterQueue::processCommands, this) {
31 pthread_setname_np(mWorkThread.native_handle(), "socket_writer_queue");
32 }
33
~BufferWriterQueue()34 BufferWriterQueue::~BufferWriterQueue() {
35 terminate();
36 // at this stage there can be N elements in the queue for which memory needs to be freed
37 // explicitly
38 drainQueue();
39 }
40
write(const uint8_t * buffer,size_t size,uint32_t atomId)41 bool BufferWriterQueue::write(const uint8_t* buffer, size_t size, uint32_t atomId) {
42 Cmd cmd = createWriteBufferCmd(buffer, size, atomId);
43 if (cmd.buffer == NULL) {
44 return false;
45 }
46 return pushToQueue(cmd);
47 }
48
getQueueSize() const49 size_t BufferWriterQueue::getQueueSize() const {
50 std::unique_lock<std::mutex> lock(mMutex);
51 return mCmdQueue.size();
52 }
53
pushToQueue(const Cmd & cmd)54 bool BufferWriterQueue::pushToQueue(const Cmd& cmd) {
55 {
56 std::unique_lock<std::mutex> lock(mMutex);
57 if (mCmdQueue.size() >= kQueueMaxSizeLimit) {
58 // TODO (b/258003151): add logging info about internal queue overflow with appropriate
59 // error code
60 return false;
61 }
62 mCmdQueue.push(cmd);
63 }
64 mCondition.notify_one();
65 return true;
66 }
67
createWriteBufferCmd(const uint8_t * buffer,size_t size,uint32_t atomId)68 BufferWriterQueue::Cmd BufferWriterQueue::createWriteBufferCmd(const uint8_t* buffer, size_t size,
69 uint32_t atomId) {
70 BufferWriterQueue::Cmd writeCmd;
71 writeCmd.atomId = atomId;
72 writeCmd.buffer = (uint8_t*)malloc(size);
73 if (writeCmd.buffer == NULL) {
74 return writeCmd;
75 }
76 memcpy(writeCmd.buffer, buffer, size);
77 writeCmd.size = size;
78 return writeCmd;
79 }
80
terminate()81 void BufferWriterQueue::terminate() {
82 if (mWorkThread.joinable()) {
83 mDoTerminate = true;
84 Cmd terminateCmd;
85 terminateCmd.buffer = NULL;
86 pushToQueue(terminateCmd);
87 mWorkThread.join();
88 }
89 }
90
drainQueue()91 void BufferWriterQueue::drainQueue() {
92 std::unique_lock<std::mutex> lock(mMutex);
93 while (!mCmdQueue.empty()) {
94 free(mCmdQueue.front().buffer);
95 mCmdQueue.pop();
96 }
97 }
98
processCommands()99 void BufferWriterQueue::processCommands() {
100 while (true) {
101 // temporary local thread copy
102 Cmd cmd;
103 {
104 std::unique_lock<std::mutex> lock(mMutex);
105 if (mCmdQueue.empty()) {
106 mCondition.wait(lock, [this] { return !this->mCmdQueue.empty(); });
107 }
108 cmd = mCmdQueue.front();
109 }
110
111 if (cmd.buffer == NULL) {
112 // null buffer ptr used as a marker of the termination request
113 return;
114 }
115
116 const bool writeSuccess = handleCommand(cmd);
117 if (writeSuccess) {
118 // no event drop is observed otherwise command remains in the queue
119 // and worker thread will try to log later on
120
121 // call free() explicitly here to free memory before the mutex lock
122 free(cmd.buffer);
123 {
124 std::unique_lock<std::mutex> lock(mMutex);
125 // this will lead to Cmd destructor call which will be no-op since now the
126 // buffer is NULL
127 mCmdQueue.pop();
128 }
129 }
130 // TODO (b/258003151): add logging info about retry count
131
132 if (mDoTerminate) {
133 return;
134 }
135
136 // attempt to enforce the logging frequency constraints
137 // in case of failed write due to socket overflow the sleep can be longer
138 // to not overload socket continuously
139 if (!writeSuccess) {
140 std::this_thread::sleep_for(std::chrono::milliseconds(kDelayOnFailedWriteMs));
141 }
142 }
143 }
144
handleCommand(const Cmd & cmd) const145 bool BufferWriterQueue::handleCommand(const Cmd& cmd) const {
146 // skip log drop if occurs, since the atom remains in the queue and write will be retried
147 return write_buffer_to_statsd_impl(cmd.buffer, cmd.size, cmd.atomId, /*doNoteDrop*/ false) > 0;
148 }
149
write_buffer_to_statsd_queue(const uint8_t * buffer,size_t size,uint32_t atomId)150 bool write_buffer_to_statsd_queue(const uint8_t* buffer, size_t size, uint32_t atomId) {
151 static BufferWriterQueue queue;
152 return queue.write(buffer, size, atomId);
153 }
154
155 #ifdef ENABLE_BENCHMARK_SUPPORT
should_write_via_queue(uint32_t atomId)156 bool should_write_via_queue(uint32_t atomId) {
157 #else
158 bool should_write_via_queue(uint32_t /*atomId*/) {
159 #endif
160 const uint32_t appUid = getuid();
161
162 // hard-coded push all system server atoms to queue
163 if (appUid == AID_SYSTEM) {
164 return true;
165 }
166
167 #ifdef ENABLE_BENCHMARK_SUPPORT
168 // some hand-picked atoms to be pushed into the queue
169 switch (atomId) {
170 case 47: // APP_BREADCRUMB_REPORTED for statsd_benchmark purpose
171 return true;
172 default:
173 return false;
174 }
175 #endif // ENABLE_BENCHMARK_SUPPORT
176 return false;
177 }
178