1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "stats_buffer_writer_queue.h"
18 
19 #include <private/android_filesystem_config.h>
20 #include <unistd.h>
21 
22 #include <chrono>
23 #include <queue>
24 #include <thread>
25 
26 #include "stats_buffer_writer_impl.h"
27 #include "stats_buffer_writer_queue_impl.h"
28 #include "utils.h"
29 
BufferWriterQueue()30 BufferWriterQueue::BufferWriterQueue() : mWorkThread(&BufferWriterQueue::processCommands, this) {
31     pthread_setname_np(mWorkThread.native_handle(), "socket_writer_queue");
32 }
33 
~BufferWriterQueue()34 BufferWriterQueue::~BufferWriterQueue() {
35     terminate();
36     // at this stage there can be N elements in the queue for which memory needs to be freed
37     // explicitly
38     drainQueue();
39 }
40 
write(const uint8_t * buffer,size_t size,uint32_t atomId)41 bool BufferWriterQueue::write(const uint8_t* buffer, size_t size, uint32_t atomId) {
42     Cmd cmd = createWriteBufferCmd(buffer, size, atomId);
43     if (cmd.buffer == NULL) {
44         return false;
45     }
46     return pushToQueue(cmd);
47 }
48 
getQueueSize() const49 size_t BufferWriterQueue::getQueueSize() const {
50     std::unique_lock<std::mutex> lock(mMutex);
51     return mCmdQueue.size();
52 }
53 
pushToQueue(const Cmd & cmd)54 bool BufferWriterQueue::pushToQueue(const Cmd& cmd) {
55     {
56         std::unique_lock<std::mutex> lock(mMutex);
57         if (mCmdQueue.size() >= kQueueMaxSizeLimit) {
58             // TODO (b/258003151): add logging info about internal queue overflow with appropriate
59             // error code
60             return false;
61         }
62         mCmdQueue.push(cmd);
63     }
64     mCondition.notify_one();
65     return true;
66 }
67 
createWriteBufferCmd(const uint8_t * buffer,size_t size,uint32_t atomId)68 BufferWriterQueue::Cmd BufferWriterQueue::createWriteBufferCmd(const uint8_t* buffer, size_t size,
69                                                                uint32_t atomId) {
70     BufferWriterQueue::Cmd writeCmd;
71     writeCmd.atomId = atomId;
72     writeCmd.buffer = (uint8_t*)malloc(size);
73     if (writeCmd.buffer == NULL) {
74         return writeCmd;
75     }
76     memcpy(writeCmd.buffer, buffer, size);
77     writeCmd.size = size;
78     return writeCmd;
79 }
80 
terminate()81 void BufferWriterQueue::terminate() {
82     if (mWorkThread.joinable()) {
83         mDoTerminate = true;
84         Cmd terminateCmd;
85         terminateCmd.buffer = NULL;
86         pushToQueue(terminateCmd);
87         mWorkThread.join();
88     }
89 }
90 
drainQueue()91 void BufferWriterQueue::drainQueue() {
92     std::unique_lock<std::mutex> lock(mMutex);
93     while (!mCmdQueue.empty()) {
94         free(mCmdQueue.front().buffer);
95         mCmdQueue.pop();
96     }
97 }
98 
processCommands()99 void BufferWriterQueue::processCommands() {
100     while (true) {
101         // temporary local thread copy
102         Cmd cmd;
103         {
104             std::unique_lock<std::mutex> lock(mMutex);
105             if (mCmdQueue.empty()) {
106                 mCondition.wait(lock, [this] { return !this->mCmdQueue.empty(); });
107             }
108             cmd = mCmdQueue.front();
109         }
110 
111         if (cmd.buffer == NULL) {
112             // null buffer ptr used as a marker of the termination request
113             return;
114         }
115 
116         const bool writeSuccess = handleCommand(cmd);
117         if (writeSuccess) {
118             // no event drop is observed otherwise command remains in the queue
119             // and worker thread will try to log later on
120 
121             // call free() explicitly here to free memory before the mutex lock
122             free(cmd.buffer);
123             {
124                 std::unique_lock<std::mutex> lock(mMutex);
125                 // this will lead to Cmd destructor call which will be no-op since now the
126                 // buffer is NULL
127                 mCmdQueue.pop();
128             }
129         }
130         // TODO (b/258003151): add logging info about retry count
131 
132         if (mDoTerminate) {
133             return;
134         }
135 
136         // attempt to enforce the logging frequency constraints
137         // in case of failed write due to socket overflow the sleep can be longer
138         // to not overload socket continuously
139         if (!writeSuccess) {
140             std::this_thread::sleep_for(std::chrono::milliseconds(kDelayOnFailedWriteMs));
141         }
142     }
143 }
144 
handleCommand(const Cmd & cmd) const145 bool BufferWriterQueue::handleCommand(const Cmd& cmd) const {
146     // skip log drop if occurs, since the atom remains in the queue and write will be retried
147     return write_buffer_to_statsd_impl(cmd.buffer, cmd.size, cmd.atomId, /*doNoteDrop*/ false) > 0;
148 }
149 
write_buffer_to_statsd_queue(const uint8_t * buffer,size_t size,uint32_t atomId)150 bool write_buffer_to_statsd_queue(const uint8_t* buffer, size_t size, uint32_t atomId) {
151     static BufferWriterQueue queue;
152     return queue.write(buffer, size, atomId);
153 }
154 
155 #ifdef ENABLE_BENCHMARK_SUPPORT
should_write_via_queue(uint32_t atomId)156 bool should_write_via_queue(uint32_t atomId) {
157 #else
158 bool should_write_via_queue(uint32_t /*atomId*/) {
159 #endif
160     const uint32_t appUid = getuid();
161 
162     // hard-coded push all system server atoms to queue
163     if (appUid == AID_SYSTEM) {
164         return true;
165     }
166 
167 #ifdef ENABLE_BENCHMARK_SUPPORT
168     // some hand-picked atoms to be pushed into the queue
169     switch (atomId) {
170         case 47:  // APP_BREADCRUMB_REPORTED for statsd_benchmark purpose
171             return true;
172         default:
173             return false;
174     }
175 #endif  // ENABLE_BENCHMARK_SUPPORT
176     return false;
177 }
178