1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "direct_channel_buffer_reader.h"
18 
19 #include <android-base/logging.h>
20 #include <android-base/thread_annotations.h>
21 #include <gtest/gtest.h>
22 #include <stdlib.h>
23 
24 #include <condition_variable>
25 #include <mutex>
26 #include <thread>
27 
28 namespace {
29 
30 // A derived class of DirectChannelBufferReader that allows blocking memory read
31 // for concurrency tests.
32 class TestableDirectChannelBufferReader : public DirectChannelBufferReader {
33    public:
TestableDirectChannelBufferReader(const sensors_event_t * direct_channel_buffer,int buffer_size_samples)34     TestableDirectChannelBufferReader(const sensors_event_t* direct_channel_buffer,
35                                       int buffer_size_samples)
36         : DirectChannelBufferReader(direct_channel_buffer, buffer_size_samples) {}
37 
ReadOneSample(int index)38     const sensors_event_t ReadOneSample(int index) {
39         {
40             std::unique_lock lk(mutex_);
41             reader_waiting_ = true;
42             lk.unlock();
43             cv_.notify_one();
44         }
45         {
46             std::unique_lock lk(mutex_);
47             cv_.wait(lk, [this] { return !should_block_reads_ || num_reads_unblocked_ > 0; });
48             reader_waiting_ = false;
49             auto return_value = DirectChannelBufferReader::ReadOneSample(index);
50             num_reads_unblocked_--;
51             lk.unlock();
52             cv_.notify_one();
53             return return_value;
54         }
55     }
56 
BlockReads()57     void BlockReads() {
58         std::unique_lock lk(mutex_);
59         should_block_reads_ = true;
60         num_reads_unblocked_ = 0;
61         lk.unlock();
62         cv_.notify_one();
63     }
64 
UnblockReads()65     void UnblockReads() {
66         std::unique_lock lk(mutex_);
67         should_block_reads_ = false;
68         lk.unlock();
69         cv_.notify_one();
70     }
71 
UnblockAndWaitForReads(int num_reads)72     void UnblockAndWaitForReads(int num_reads) {
73         {
74             std::unique_lock lk(mutex_);
75             CHECK_EQ(num_reads_unblocked_, 0);
76             num_reads_unblocked_ = num_reads;
77             lk.unlock();
78             cv_.notify_one();
79         }
80         {
81             std::unique_lock lk(mutex_);
82             // Only proceed when reads are all done AND the reader is blocked again.
83             // This way we ensure nothing is done on the reader thread (like sample
84             // validation) when more samples are being written.
85             cv_.wait(lk, [this] { return num_reads_unblocked_ == 0 && reader_waiting_; });
86         }
87     }
88 
89    private:
90     std::mutex mutex_;
91     std::condition_variable cv_;
92     bool should_block_reads_ GUARDED_BY(mutex_) = false;
93     bool reader_waiting_ GUARDED_BY(mutex_) = false;
94     int num_reads_unblocked_ GUARDED_BY(mutex_) = 0;
95 };
96 
97 class DirectChannelBufferReaderTest : public ::testing::Test {
98    protected:
DirectChannelBufferReaderTest()99     DirectChannelBufferReaderTest() : buffer_{}, reader_(&buffer_[0], kBufferSize) {}
100 
WriteOneSample()101     void WriteOneSample() {
102         WritePartialSample();
103         FinishWritingSample();
104     }
105 
WritePartialSample()106     void WritePartialSample() { buffer_[next_buffer_index_].timestamp = next_atomic_counter_; }
107 
FinishWritingSample()108     void FinishWritingSample() {
109         buffer_[next_buffer_index_].data[0] = next_atomic_counter_;
110         buffer_[next_buffer_index_].reserved0 = next_atomic_counter_;
111         next_buffer_index_ = (next_buffer_index_ + 1) % kBufferSize;
112         next_atomic_counter_ = (next_atomic_counter_ % UINT32_MAX) + 1;
113     }
114 
WriteHalfSample()115     void WriteHalfSample() {
116         if (buffer_[next_buffer_index_].timestamp != next_atomic_counter_) {
117             WritePartialSample();
118         } else {
119             FinishWritingSample();
120         }
121     }
122 
ValidateReaderSamples()123     void ValidateReaderSamples() {
124         auto& samples = reader_.GetSampleContainer();
125         for (int i = 0; i < samples.size(); i++) {
126             int64_t expected_value =
127                 ((next_atomic_counter_ - samples.size() + i - 1 + UINT32_MAX) % UINT32_MAX) + 1;
128             EXPECT_EQ(static_cast<uint32_t>(samples[i].reserved0), expected_value) << " i = " << i;
129             EXPECT_EQ(samples[i].timestamp, expected_value);
130             EXPECT_EQ(samples[i].data[0], expected_value);
131         }
132     }
133 
StartReaderThread()134     void StartReaderThread() {
135         reader_.BlockReads();
136         reader_thread_ = std::make_unique<std::thread>([this] {
137             while (keep_reading_) {
138                 reader_.Read();
139                 // At this point we want to validate the samples and check the values
140                 // against next_atomic_counter_. To prevent next_atomic_counter_ from
141                 // being modified by the writer thread, we make the writer thread
142                 // blocked inside UnblockAndWaitForReads() until the validation is done
143                 // and reader_.Read() is called again.
144                 ValidateReaderSamples();
145             }
146         });
147     }
148 
StopAndJoinReaderThread()149     void StopAndJoinReaderThread() {
150         reader_.UnblockReads();
151         keep_reading_ = false;
152         reader_thread_->join();
153     }
154 
155     static constexpr int kBufferSize = 20;
156     std::array<sensors_event_t, kBufferSize> buffer_;
157     TestableDirectChannelBufferReader reader_;
158 
159     int next_buffer_index_ = 0;
160     int64_t next_atomic_counter_ = 1;
161 
162     std::unique_ptr<std::thread> reader_thread_;
163     bool keep_reading_ = true;
164 };
165 
TEST_F(DirectChannelBufferReaderTest,ReturnNoDataForEmptyBuffer)166 TEST_F(DirectChannelBufferReaderTest, ReturnNoDataForEmptyBuffer) {
167     EXPECT_EQ(reader_.Read(), 0);
168     EXPECT_EQ(reader_.GetSampleContainer().size(), 0);
169 }
170 
TEST_F(DirectChannelBufferReaderTest,ReturnOneSample)171 TEST_F(DirectChannelBufferReaderTest, ReturnOneSample) {
172     WriteOneSample();
173     EXPECT_EQ(reader_.Read(), 1);
174     EXPECT_EQ(reader_.GetSampleContainer().size(), 1);
175 }
176 
TEST_F(DirectChannelBufferReaderTest,ReturnSamplesWithFullBuffer)177 TEST_F(DirectChannelBufferReaderTest, ReturnSamplesWithFullBuffer) {
178     for (int i = 0; i < kBufferSize; i++) {
179         WriteOneSample();
180     }
181     EXPECT_EQ(reader_.Read(), kBufferSize - 1);
182     EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
183     ValidateReaderSamples();
184 }
185 
TEST_F(DirectChannelBufferReaderTest,ReturnSamplesWithInterleavedWriteRead)186 TEST_F(DirectChannelBufferReaderTest, ReturnSamplesWithInterleavedWriteRead) {
187     WriteOneSample();
188     EXPECT_EQ(reader_.Read(), 1);
189     WriteOneSample();
190     WriteOneSample();
191     EXPECT_EQ(reader_.Read(), 2);
192     EXPECT_EQ(reader_.GetSampleContainer().size(), 3);
193     ValidateReaderSamples();
194 }
195 
TEST_F(DirectChannelBufferReaderTest,ReturnNothingAfterPartialWrite)196 TEST_F(DirectChannelBufferReaderTest, ReturnNothingAfterPartialWrite) {
197     WriteOneSample();
198     EXPECT_EQ(reader_.Read(), 1);
199     WritePartialSample();
200     EXPECT_EQ(reader_.Read(), 0);
201     FinishWritingSample();
202     EXPECT_EQ(reader_.Read(), 1);
203     EXPECT_EQ(reader_.GetSampleContainer().size(), 2);
204     ValidateReaderSamples();
205 }
206 
TEST_F(DirectChannelBufferReaderTest,DiscardPartiallyWrittenSample)207 TEST_F(DirectChannelBufferReaderTest, DiscardPartiallyWrittenSample) {
208     WriteOneSample();
209     EXPECT_EQ(reader_.Read(), 1);
210     for (int i = 0; i < kBufferSize; i++) {
211         WriteOneSample();
212     }
213     // State of the buffer: 21 2 3 4 5 .... 20
214     //                         ^
215     //         Both read and write head point here
216 
217     WritePartialSample();
218     // State of the buffer: 21 2 3 4 5 .... 20
219     //                         ^
220     //     Partially overwritten with sample 22
221     // The next Read() should get sample 3-21. Sample 2 should be discarded.
222     EXPECT_EQ(reader_.Read(), kBufferSize - 1);
223     EXPECT_EQ(reader_.GetSampleContainer().front().timestamp, 3);
224     EXPECT_EQ(reader_.GetSampleContainer().back().timestamp, 21);
225 }
226 
TEST_F(DirectChannelBufferReaderTest,ReturnCorrectSamplesAfterWriterOverflow)227 TEST_F(DirectChannelBufferReaderTest, ReturnCorrectSamplesAfterWriterOverflow) {
228     WriteOneSample();
229     reader_.Read();
230     for (int i = 0; i < kBufferSize + 5; i++) {
231         WriteOneSample();
232     }
233     EXPECT_EQ(reader_.Read(), kBufferSize - 1);
234     EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
235     ValidateReaderSamples();
236 }
237 
TEST_F(DirectChannelBufferReaderTest,ReturnNumOfSkippedSamples)238 TEST_F(DirectChannelBufferReaderTest, ReturnNumOfSkippedSamples) {
239     WriteOneSample();
240     reader_.Read();
241     for (int i = 0; i < kBufferSize + 5; i++) {
242         WriteOneSample();
243     }
244     int num_samples_skipped = 0;
245     reader_.Read(&num_samples_skipped);
246     EXPECT_EQ(num_samples_skipped, 6);
247 }
248 
TEST_F(DirectChannelBufferReaderTest,WrapAroundUINT32Max)249 TEST_F(DirectChannelBufferReaderTest, WrapAroundUINT32Max) {
250     next_atomic_counter_ = UINT32_MAX - 3;
251     for (int i = 0; i < kBufferSize; i++) {
252         WriteOneSample();
253     }
254     EXPECT_EQ(reader_.Read(), kBufferSize - 1);
255     EXPECT_EQ(reader_.GetSampleContainer().size(), kBufferSize - 1);
256     ValidateReaderSamples();
257 }
258 
TEST_F(DirectChannelBufferReaderTest,ConcurrentWriteReadSequence)259 TEST_F(DirectChannelBufferReaderTest, ConcurrentWriteReadSequence) {
260     WriteOneSample();
261     // Buffer:                  1 0 0 0 ...
262     // Writer head:             ^
263 
264     reader_.Read();
265     // Buffer:                  1 0 0 0 ...
266     // Writer head:             ^
267     // What reader sees so far: 1
268 
269     StartReaderThread();
270     for (int i = 0; i < kBufferSize; i++) {
271         WriteOneSample();
272     }
273     // Buffer:                  21 2 3 4 ...
274     // Writer head:             ^
275     // What reader sees so far: 1
276 
277     WriteHalfSample();
278     // Buffer:                  21 <counter:2,content:22> 3 4 ...
279     // Writer head:                          ^
280     // What reader sees so far: 1
281 
282     reader_.UnblockAndWaitForReads(2);
283     // Buffer:                  21 <counter:2,content:22> 3 4 ...
284     // Writer head:                          ^
285     // What reader sees so far: 1  2*                     3
286     // (sample 2 is corrupted)
287 
288     WriteHalfSample();
289     // Buffer:                  21 22 3 4 5 ...
290     // Writer head:                ^
291     // What reader sees so far: 1  2  3
292 
293     WriteOneSample();
294     WriteOneSample();
295     // Buffer:                  21 22 23 24 5 6 ...
296     // Writer head:                      ^
297     // What reader sees so far: 1  2  3
298 
299     WriteHalfSample();
300     // Buffer:                  21 22 23 24 <counter:5,content:25> 6 ...
301     // Writer head:                                   ^
302     // What reader sees so far: 1  2  3
303 
304     StopAndJoinReaderThread();
305     // Buffer:                  21 22 23 24 <counter:5,content:25> 6 ...
306     // Writer head:                                   ^
307     // What reader sees so far: 21 22 23 24 5*                     6 ...
308     // (sample 5 is corrupted)
309     //
310     // The validation performed on the reader thread would ensure that sample 2
311     // and 5 were not returned.
312 }
313 
TEST_F(DirectChannelBufferReaderTest,GeneratedConcurrentWriteReadSequence)314 TEST_F(DirectChannelBufferReaderTest, GeneratedConcurrentWriteReadSequence) {
315     constexpr int kNumRounds = 5000;
316     constexpr int kMaxReadWritePerRound = kBufferSize + 5;
317     StartReaderThread();
318     // For deterministic results, use an arbitrary fixed seed for random number
319     // generator.
320     srand(12345);
321     for (int i = 0; i < kNumRounds; i++) {
322         bool write = rand() % 2 == 0;
323         if (write) {
324             // Multiply by 2 since each call only writes half a sample.
325             int num_writes = rand() % (kMaxReadWritePerRound * 2);
326             for (int j = 0; j < num_writes; j++) {
327                 WriteHalfSample();
328             }
329         } else {
330             int num_reads = rand() % kMaxReadWritePerRound;
331             reader_.UnblockAndWaitForReads(num_reads);
332         }
333     }
334     StopAndJoinReaderThread();
335 }
336 
337 }  // namespace
338