1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "direct_channel_buffer_reader.h"
18
19 #include <android-base/logging.h>
20 #include <android-base/thread_annotations.h>
21 #include <gtest/gtest.h>
22 #include <stdlib.h>
23
24 #include <condition_variable>
25 #include <mutex>
26 #include <thread>
27
28 namespace {
29
30 // A derived class of DirectChannelBufferReader that allows blocking memory read
31 // for concurrency tests.
32 class TestableDirectChannelBufferReader : public DirectChannelBufferReader {
33 public:
TestableDirectChannelBufferReader(const sensors_event_t * direct_channel_buffer,int buffer_size_samples)34 TestableDirectChannelBufferReader(const sensors_event_t* direct_channel_buffer,
35 int buffer_size_samples)
36 : DirectChannelBufferReader(direct_channel_buffer, buffer_size_samples) {}
37
ReadOneSample(int index)38 const sensors_event_t ReadOneSample(int index) {
39 {
40 std::unique_lock lk(mutex_);
41 reader_waiting_ = true;
42 lk.unlock();
43 cv_.notify_one();
44 }
45 {
46 std::unique_lock lk(mutex_);
47 cv_.wait(lk, [this] { return !should_block_reads_ || num_reads_unblocked_ > 0; });
48 reader_waiting_ = false;
49 auto return_value = DirectChannelBufferReader::ReadOneSample(index);
50 num_reads_unblocked_--;
51 lk.unlock();
52 cv_.notify_one();
53 return return_value;
54 }
55 }
56
BlockReads()57 void BlockReads() {
58 std::unique_lock lk(mutex_);
59 should_block_reads_ = true;
60 num_reads_unblocked_ = 0;
61 lk.unlock();
62 cv_.notify_one();
63 }
64
UnblockReads()65 void UnblockReads() {
66 std::unique_lock lk(mutex_);
67 should_block_reads_ = false;
68 lk.unlock();
69 cv_.notify_one();
70 }
71
UnblockAndWaitForReads(int num_reads)72 void UnblockAndWaitForReads(int num_reads) {
73 {
74 std::unique_lock lk(mutex_);
75 CHECK_EQ(num_reads_unblocked_, 0);
76 num_reads_unblocked_ = num_reads;
77 lk.unlock();
78 cv_.notify_one();
79 }
80 {
81 std::unique_lock lk(mutex_);
82 // Only proceed when reads are all done AND the reader is blocked again.
83 // This way we ensure nothing is done on the reader thread (like sample
84 // validation) when more samples are being written.
85 cv_.wait(lk, [this] { return num_reads_unblocked_ == 0 && reader_waiting_; });
86 }
87 }
88
89 private:
90 std::mutex mutex_;
91 std::condition_variable cv_;
92 bool should_block_reads_ GUARDED_BY(mutex_) = false;
93 bool reader_waiting_ GUARDED_BY(mutex_) = false;
94 int num_reads_unblocked_ GUARDED_BY(mutex_) = 0;
95 };
96
97 class DirectChannelBufferReaderTest : public ::testing::Test {
98 protected:
DirectChannelBufferReaderTest()99 DirectChannelBufferReaderTest() : buffer_{}, reader_(&buffer_[0], kBufferSize) {}
100
WriteOneSample()101 void WriteOneSample() {
102 WritePartialSample();
103 FinishWritingSample();
104 }
105
WritePartialSample()106 void WritePartialSample() { buffer_[next_buffer_index_].timestamp = next_atomic_counter_; }
107
FinishWritingSample()108 void FinishWritingSample() {
109 buffer_[next_buffer_index_].data[0] = next_atomic_counter_;
110 buffer_[next_buffer_index_].reserved0 = next_atomic_counter_;
111 next_buffer_index_ = (next_buffer_index_ + 1) % kBufferSize;
112 next_atomic_counter_ = (next_atomic_counter_ % UINT32_MAX) + 1;
113 }
114
WriteHalfSample()115 void WriteHalfSample() {
116 if (buffer_[next_buffer_index_].timestamp != next_atomic_counter_) {
117 WritePartialSample();
118 } else {
119 FinishWritingSample();
120 }
121 }
122
ValidateReaderSamples()123 void ValidateReaderSamples() {
124 auto& samples = reader_.GetSampleContainer();
125 for (int i = 0; i < samples.size(); i++) {
126 int64_t expected_value =
127 ((next_atomic_counter_ - samples.size() + i - 1 + UINT32_MAX) % UINT32_MAX) + 1;
128 EXPECT_EQ(static_cast<uint32_t>(samples[i].reserved0), expected_value) << " i = " << i;
129 EXPECT_EQ(samples[i].timestamp, expected_value);
130 EXPECT_EQ(samples[i].data[0], expected_value);
131 }
132 }
133
StartReaderThread()134 void StartReaderThread() {
135 reader_.BlockReads();
136 reader_thread_ = std::make_unique<std::thread>([this] {
137 while (keep_reading_) {
138 reader_.Read();
139 // At this point we want to validate the samples and check the values
140 // against next_atomic_counter_. To prevent next_atomic_counter_ from
141 // being modified by the writer thread, we make the writer thread
142 // blocked inside UnblockAndWaitForReads() until the validation is done
143 // and reader_.Read() is called again.
144 ValidateReaderSamples();
145 }
146 });
147 }
148
StopAndJoinReaderThread()149 void StopAndJoinReaderThread() {
150 reader_.UnblockReads();
151 keep_reading_ = false;
152 reader_thread_->join();
153 }
154
155 static constexpr int kBufferSize = 20;
156 std::array<sensors_event_t, kBufferSize> buffer_;
157 TestableDirectChannelBufferReader reader_;
158
159 int next_buffer_index_ = 0;
160 int64_t next_atomic_counter_ = 1;
161
162 std::unique_ptr<std::thread> reader_thread_;
163 bool keep_reading_ = true;
164 };
165
TEST_F(DirectChannelBufferReaderTest,ReturnNoDataForEmptyBuffer)166 TEST_F(DirectChannelBufferReaderTest, ReturnNoDataForEmptyBuffer) {
167 EXPECT_EQ(reader_.Read(), 0);
168 EXPECT_EQ(reader_.GetSampleContainer().size(), 0);
169 }
170
TEST_F(DirectChannelBufferReaderTest,ReturnOneSample)171 TEST_F(DirectChannelBufferReaderTest, ReturnOneSample) {
172 WriteOneSample();
173 EXPECT_EQ(reader_.Read(), 1);
174 EXPECT_EQ(reader_.GetSampleContainer().size(), 1);
175 }
176
TEST_F(DirectChannelBufferReaderTest,ReturnSamplesWithFullBuffer)177 TEST_F(DirectChannelBufferReaderTest, ReturnSamplesWithFullBuffer) {
178 for (int i = 0; i < kBufferSize; i++) {
179 WriteOneSample();
180 }
181 EXPECT_EQ(reader_.Read(), kBufferSize - 1);
182 EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
183 ValidateReaderSamples();
184 }
185
TEST_F(DirectChannelBufferReaderTest,ReturnSamplesWithInterleavedWriteRead)186 TEST_F(DirectChannelBufferReaderTest, ReturnSamplesWithInterleavedWriteRead) {
187 WriteOneSample();
188 EXPECT_EQ(reader_.Read(), 1);
189 WriteOneSample();
190 WriteOneSample();
191 EXPECT_EQ(reader_.Read(), 2);
192 EXPECT_EQ(reader_.GetSampleContainer().size(), 3);
193 ValidateReaderSamples();
194 }
195
TEST_F(DirectChannelBufferReaderTest,ReturnNothingAfterPartialWrite)196 TEST_F(DirectChannelBufferReaderTest, ReturnNothingAfterPartialWrite) {
197 WriteOneSample();
198 EXPECT_EQ(reader_.Read(), 1);
199 WritePartialSample();
200 EXPECT_EQ(reader_.Read(), 0);
201 FinishWritingSample();
202 EXPECT_EQ(reader_.Read(), 1);
203 EXPECT_EQ(reader_.GetSampleContainer().size(), 2);
204 ValidateReaderSamples();
205 }
206
TEST_F(DirectChannelBufferReaderTest,DiscardPartiallyWrittenSample)207 TEST_F(DirectChannelBufferReaderTest, DiscardPartiallyWrittenSample) {
208 WriteOneSample();
209 EXPECT_EQ(reader_.Read(), 1);
210 for (int i = 0; i < kBufferSize; i++) {
211 WriteOneSample();
212 }
213 // State of the buffer: 21 2 3 4 5 .... 20
214 // ^
215 // Both read and write head point here
216
217 WritePartialSample();
218 // State of the buffer: 21 2 3 4 5 .... 20
219 // ^
220 // Partially overwritten with sample 22
221 // The next Read() should get sample 3-21. Sample 2 should be discarded.
222 EXPECT_EQ(reader_.Read(), kBufferSize - 1);
223 EXPECT_EQ(reader_.GetSampleContainer().front().timestamp, 3);
224 EXPECT_EQ(reader_.GetSampleContainer().back().timestamp, 21);
225 }
226
TEST_F(DirectChannelBufferReaderTest,ReturnCorrectSamplesAfterWriterOverflow)227 TEST_F(DirectChannelBufferReaderTest, ReturnCorrectSamplesAfterWriterOverflow) {
228 WriteOneSample();
229 reader_.Read();
230 for (int i = 0; i < kBufferSize + 5; i++) {
231 WriteOneSample();
232 }
233 EXPECT_EQ(reader_.Read(), kBufferSize - 1);
234 EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
235 ValidateReaderSamples();
236 }
237
TEST_F(DirectChannelBufferReaderTest,ReturnNumOfSkippedSamples)238 TEST_F(DirectChannelBufferReaderTest, ReturnNumOfSkippedSamples) {
239 WriteOneSample();
240 reader_.Read();
241 for (int i = 0; i < kBufferSize + 5; i++) {
242 WriteOneSample();
243 }
244 int num_samples_skipped = 0;
245 reader_.Read(&num_samples_skipped);
246 EXPECT_EQ(num_samples_skipped, 6);
247 }
248
TEST_F(DirectChannelBufferReaderTest,WrapAroundUINT32Max)249 TEST_F(DirectChannelBufferReaderTest, WrapAroundUINT32Max) {
250 next_atomic_counter_ = UINT32_MAX - 3;
251 for (int i = 0; i < kBufferSize; i++) {
252 WriteOneSample();
253 }
254 EXPECT_EQ(reader_.Read(), kBufferSize - 1);
255 EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
256 ValidateReaderSamples();
257 }
258
TEST_F(DirectChannelBufferReaderTest,ConcurrentWriteReadSequence)259 TEST_F(DirectChannelBufferReaderTest, ConcurrentWriteReadSequence) {
260 WriteOneSample();
261 // Buffer: 1 0 0 0 ...
262 // Writer head: ^
263
264 reader_.Read();
265 // Buffer: 1 0 0 0 ...
266 // Writer head: ^
267 // What reader sees so far: 1
268
269 StartReaderThread();
270 for (int i = 0; i < kBufferSize; i++) {
271 WriteOneSample();
272 }
273 // Buffer: 21 2 3 4 ...
274 // Writer head: ^
275 // What reader sees so far: 1
276
277 WriteHalfSample();
278 // Buffer: 21 <counter:2,content:22> 3 4 ...
279 // Writer head: ^
280 // What reader sees so far: 1
281
282 reader_.UnblockAndWaitForReads(2);
283 // Buffer: 21 <counter:2,content:22> 3 4 ...
284 // Writer head: ^
285 // What reader sees so far: 1 2* 3
286 // (sample 2 is corrupted)
287
288 WriteHalfSample();
289 // Buffer: 21 22 3 4 5 ...
290 // Writer head: ^
291 // What reader sees so far: 1 2 3
292
293 WriteOneSample();
294 WriteOneSample();
295 // Buffer: 21 22 23 24 5 6 ...
296 // Writer head: ^
297 // What reader sees so far: 1 2 3
298
299 WriteHalfSample();
300 // Buffer: 21 22 23 24 <counter:5,content:25> 6 ...
301 // Writer head: ^
302 // What reader sees so far: 1 2 3
303
304 StopAndJoinReaderThread();
305 // Buffer: 21 22 23 24 <counter:5,content:25> 6 ...
306 // Writer head: ^
307 // What reader sees so far: 21 22 23 24 5* 6 ...
308 // (sample 5 is corrupted)
309 //
310 // The validation performed on the reader thread would ensure that sample 2
311 // and 5 were not returned.
312 }
313
TEST_F(DirectChannelBufferReaderTest,GeneratedConcurrentWriteReadSequence)314 TEST_F(DirectChannelBufferReaderTest, GeneratedConcurrentWriteReadSequence) {
315 constexpr int kNumRounds = 5000;
316 constexpr int kMaxReadWritePerRound = kBufferSize + 5;
317 StartReaderThread();
318 // For deterministic results, use an arbitrary fixed seed for random number
319 // generator.
320 srand(12345);
321 for (int i = 0; i < kNumRounds; i++) {
322 bool write = rand() % 2 == 0;
323 if (write) {
324 // Multiply by 2 since each call only writes half a sample.
325 int num_writes = rand() % (kMaxReadWritePerRound * 2);
326 for (int j = 0; j < num_writes; j++) {
327 WriteHalfSample();
328 }
329 } else {
330 int num_reads = rand() % kMaxReadWritePerRound;
331 reader_.UnblockAndWaitForReads(num_reads);
332 }
333 }
334 StopAndJoinReaderThread();
335 }
336
337 } // namespace
338