1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "chre/util/system/atomic_spsc_queue.h"
18 #include "chre/util/array_queue.h"
19 #include "gtest/gtest.h"
20
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24
25 using chre::ArrayQueue;
26 using chre::AtomicSpscQueue;
27 using chre::FixedSizeVector;
28
29 namespace {
30
31 constexpr int kMaxTestCapacity = 10;
32 int destructor_count[kMaxTestCapacity];
33 int constructor_count;
34 int total_destructor_count;
35
36 class FakeElement {
37 public:
FakeElement()38 FakeElement() {
39 constructor_count++;
40 };
FakeElement(int i)41 FakeElement(int i) {
42 val_ = i;
43 constructor_count++;
44 };
~FakeElement()45 ~FakeElement() {
46 total_destructor_count++;
47 if (val_ >= 0 && val_ < kMaxTestCapacity) {
48 destructor_count[val_]++;
49 }
50 };
setValue(int i)51 void setValue(int i) {
52 val_ = i;
53 }
54
55 private:
56 int val_ = kMaxTestCapacity - 1;
57 };
58
59 } // namespace
60
TEST(AtomicSpscQueueTest,IsEmptyInitially)61 TEST(AtomicSpscQueueTest, IsEmptyInitially) {
62 AtomicSpscQueue<int, 4> q;
63 EXPECT_EQ(4, q.capacity());
64 EXPECT_TRUE(q.consumer().empty());
65 EXPECT_EQ(0, q.consumer().size());
66 EXPECT_EQ(0, q.producer().size());
67 EXPECT_EQ(0, q.size());
68 }
69
TEST(AtomicSpscQueueTest,SimplePushPop)70 TEST(AtomicSpscQueueTest, SimplePushPop) {
71 AtomicSpscQueue<int, 3> q;
72 q.producer().push(1);
73 q.producer().push(2);
74 EXPECT_EQ(q.consumer().front(), 1);
75 EXPECT_FALSE(q.producer().full());
76 q.consumer().pop();
77 q.producer().push(3);
78 EXPECT_EQ(q.consumer().front(), 2);
79 q.consumer().pop();
80 EXPECT_EQ(q.consumer().front(), 3);
81 }
82
TEST(AtomicSpscQueueTest,TestSize)83 TEST(AtomicSpscQueueTest, TestSize) {
84 AtomicSpscQueue<int, 2> q;
85 EXPECT_EQ(0, q.size());
86 q.producer().push(1);
87 EXPECT_EQ(1, q.size());
88 q.producer().push(2);
89 EXPECT_EQ(2, q.size());
90 q.consumer().pop();
91 EXPECT_EQ(1, q.size());
92 q.consumer().pop();
93 EXPECT_EQ(0, q.size());
94 }
95
TEST(AtomicSpscQueueTest,TestFront)96 TEST(AtomicSpscQueueTest, TestFront) {
97 AtomicSpscQueue<int, 3> q;
98 q.producer().emplace(1);
99 EXPECT_EQ(1, q.consumer().front());
100 q.consumer().pop();
101 q.producer().emplace(2);
102 EXPECT_EQ(2, q.consumer().front());
103 q.producer().emplace(3);
104 EXPECT_EQ(2, q.consumer().front());
105 }
106
TEST(AtomicSpscQueueTest,DestructorCalledOnPop)107 TEST(AtomicSpscQueueTest, DestructorCalledOnPop) {
108 for (size_t i = 0; i < kMaxTestCapacity; ++i) {
109 destructor_count[i] = 0;
110 }
111
112 AtomicSpscQueue<FakeElement, 3> q;
113 FakeElement e;
114 q.producer().push(e);
115 q.producer().push(e);
116
117 q.consumer().front().setValue(0);
118 q.consumer().pop();
119 EXPECT_EQ(1, destructor_count[0]);
120
121 q.consumer().front().setValue(1);
122 q.consumer().pop();
123 EXPECT_EQ(1, destructor_count[1]);
124 }
125
TEST(AtomicSpscQueueTest,ElementsDestructedWhenQueueDestructed)126 TEST(AtomicSpscQueueTest, ElementsDestructedWhenQueueDestructed) {
127 for (size_t i = 0; i < kMaxTestCapacity; ++i) {
128 destructor_count[i] = 0;
129 }
130
131 {
132 AtomicSpscQueue<FakeElement, 4> q;
133
134 for (size_t i = 0; i < 3; ++i) {
135 q.producer().emplace(i);
136 }
137 }
138
139 for (size_t i = 0; i < 3; ++i) {
140 EXPECT_EQ(1, destructor_count[i]);
141 }
142
143 EXPECT_EQ(0, destructor_count[3]);
144 }
145
TEST(AtomicSpscQueueTest,ExtractFull)146 TEST(AtomicSpscQueueTest, ExtractFull) {
147 constexpr size_t kSize = 16;
148 AtomicSpscQueue<int32_t, kSize> q;
149
150 for (int32_t i = 0; i < kSize; i++) {
151 q.producer().push(i);
152 }
153
154 int32_t dest[kSize + 1];
155 memset(dest, 0, sizeof(dest));
156 dest[kSize] = 0xdeadbeef;
157 size_t extracted = q.consumer().extract(dest, kSize);
158 EXPECT_EQ(extracted, kSize);
159 for (int32_t i = 0; i < kSize; i++) {
160 EXPECT_EQ(dest[i], i);
161 }
162 EXPECT_EQ(0xdeadbeef, dest[kSize]);
163 }
164
TEST(AtomicSpscQueueTest,ExtractPartial)165 TEST(AtomicSpscQueueTest, ExtractPartial) {
166 constexpr size_t kSize = 16;
167 AtomicSpscQueue<int32_t, kSize> q;
168
169 for (int32_t i = 0; i < kSize / 2; i++) {
170 q.producer().push(i);
171 }
172
173 int32_t dest[kSize + 1];
174 memset(dest, 0, sizeof(dest));
175 size_t extracted = q.consumer().extract(dest, kSize / 4);
176 EXPECT_EQ(extracted, kSize / 4);
177 for (int32_t i = 0; i < kSize / 4; i++) {
178 EXPECT_EQ(dest[i], i);
179 }
180 EXPECT_EQ(0, dest[kSize / 4]);
181 EXPECT_EQ(kSize / 4, q.size());
182
183 extracted = q.consumer().extract(&dest[kSize / 4], kSize / 4);
184 EXPECT_EQ(extracted, kSize / 4);
185 for (int32_t i = kSize / 4; i < kSize / 2; i++) {
186 EXPECT_EQ(dest[i], i);
187 }
188 EXPECT_EQ(0, dest[kSize]);
189 EXPECT_TRUE(q.consumer().empty());
190
191 q.producer().push(0xd00d);
192 EXPECT_EQ(0xd00d, q.consumer().front());
193 q.consumer().pop();
194 EXPECT_TRUE(q.consumer().empty());
195 }
196
TEST(AtomicSpscQueueTest,ExtractWraparound)197 TEST(AtomicSpscQueueTest, ExtractWraparound) {
198 constexpr size_t kSize = 16;
199 AtomicSpscQueue<int32_t, kSize> q;
200 auto p = q.producer();
201 auto c = q.consumer();
202
203 for (int32_t i = 0; i < kSize; i++) {
204 p.push(i);
205 }
206
207 for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
208 c.pop();
209 p.push(i);
210 }
211
212 // Now two copies will be needed to extract the data
213 int32_t dest[kSize + 1];
214 memset(dest, 0, sizeof(dest));
215 dest[kSize] = 0xdeadbeef;
216
217 // Pull all except 1
218 size_t extracted = c.extract(dest, kSize - 1);
219 EXPECT_EQ(extracted, kSize - 1);
220
221 // And now the last one (asking for more than we expect to get)
222 EXPECT_EQ(1, q.size());
223 extracted = c.extract(&dest[kSize - 1], 2);
224 EXPECT_EQ(extracted, 1);
225
226 for (int32_t i = 0; i < kSize; i++) {
227 EXPECT_EQ(dest[i], i + kSize / 2);
228 }
229 EXPECT_EQ(0xdeadbeef, dest[kSize]);
230 }
231
TEST(AtomicSpscQueueTest,PopWraparound)232 TEST(AtomicSpscQueueTest, PopWraparound) {
233 constexpr size_t kSize = 16;
234 AtomicSpscQueue<int32_t, kSize> q;
235 auto p = q.producer();
236 auto c = q.consumer();
237
238 for (int32_t i = 0; i < kSize; i++) {
239 p.push(i);
240 }
241
242 for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
243 EXPECT_EQ(c.front(), i - kSize);
244 c.pop();
245 p.push(i);
246 }
247
248 for (int32_t i = kSize / 2; i < kSize + kSize / 2; i++) {
249 EXPECT_EQ(c.front(), i);
250 c.pop();
251 }
252 }
253
TEST(AtomicSpscQueueTest,ExtractVector)254 TEST(AtomicSpscQueueTest, ExtractVector) {
255 constexpr size_t kSize = 8;
256 AtomicSpscQueue<int, kSize> q;
257
258 auto p = q.producer();
259 for (int i = 0; i < kSize; i++) {
260 p.push(i);
261 }
262
263 auto c = q.consumer();
264 constexpr size_t kExtraSpace = 2;
265 static_assert(kSize > kExtraSpace + 2, "Test assumption broken");
266 FixedSizeVector<int, kSize + kExtraSpace> v;
267
268 // Output size dependent on elements available in queue
269 size_t extracted = c.extract(&v);
270 EXPECT_EQ(extracted, kSize);
271 EXPECT_EQ(kSize, v.size());
272 for (int i = 0; i < kSize; i++) {
273 EXPECT_EQ(v[i], i);
274 }
275
276 for (int i = kSize; i < kSize + kExtraSpace; i++) {
277 p.push(i);
278 }
279 p.push(1337);
280 p.push(42);
281
282 // Output size dependent on space available in vector
283 extracted = c.extract(&v);
284 EXPECT_EQ(extracted, kExtraSpace);
285 EXPECT_EQ(v.capacity(), v.size());
286 for (int i = 0; i < kSize + kExtraSpace; i++) {
287 EXPECT_EQ(v[i], i);
288 }
289 EXPECT_EQ(2, q.size());
290
291 // Output size 0 (no space left in vector)
292 extracted = c.extract(&v);
293 EXPECT_EQ(0, extracted);
294 EXPECT_EQ(2, q.size());
295
296 // Extract into reset vector
297 v.resize(0);
298 extracted = c.extract(&v);
299 EXPECT_EQ(2, extracted);
300 EXPECT_EQ(2, v.size());
301 EXPECT_EQ(v[0], 1337);
302 EXPECT_EQ(v[1], 42);
303
304 // Output size 0 (no elements left in queue)
305 EXPECT_TRUE(q.consumer().empty());
306 extracted = c.extract(&v);
307 EXPECT_EQ(0, extracted);
308 }
309
310 // If this test fails it's likely due to thread interleaving, so consider
311 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
312 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,ConcurrencyStress)313 TEST(AtomicSpscQueueStressTest, ConcurrencyStress) {
314 constexpr size_t kCapacity = 2048;
315 constexpr int64_t kMaxCount = 100 * kCapacity;
316 AtomicSpscQueue<int64_t, kCapacity> q;
317
318 auto producer = q.producer();
319 std::thread producerThread = std::thread(
320 [](decltype(producer) p) {
321 int64_t count = 0;
322 while (count <= kMaxCount) {
323 if (p.full()) {
324 // Give the other thread a chance to be scheduled
325 std::this_thread::yield();
326 continue;
327 }
328
329 p.push(count++);
330 }
331 },
332 producer);
333
334 auto consumer = q.consumer();
335 std::thread consumerThread = std::thread(
336 [](decltype(consumer) c) {
337 int64_t last = -1;
338 do {
339 if (c.empty()) {
340 std::this_thread::yield();
341 continue;
342 }
343 int64_t next = c.front();
344 if (last != -1) {
345 EXPECT_EQ(last + 1, next);
346 }
347 last = next;
348 c.pop();
349 } while (last < kMaxCount);
350 },
351 consumer);
352
353 producerThread.join();
354 consumerThread.join();
355
356 EXPECT_EQ(0, q.size());
357 }
358
359 // Helpers for SynchronizedConcurrencyStress
360 enum class Op {
361 kPush = 0,
362 kPull = 1,
363 };
364 struct HistoryEntry {
365 Op op;
366 int numElements;
367 int64_t last;
368
369 HistoryEntry() = default;
HistoryEntryHistoryEntry370 HistoryEntry(Op op_, int numElements_, int64_t last_)
371 : op(op_), numElements(numElements_), last(last_) {}
372 };
373
374 constexpr size_t kHistorySize = 512;
375
376 namespace chre { // (PrintTo needs to be in the same namespace as ArrayQueue)
377
PrintTo(const ArrayQueue<HistoryEntry,kHistorySize> & history,std::ostream * os)378 void PrintTo(const ArrayQueue<HistoryEntry, kHistorySize> &history,
379 std::ostream *os) {
380 *os << "Dumping history from oldest to newest:" << std::endl;
381 for (const HistoryEntry &entry : history) {
382 *os << " " << ((entry.op == Op::kPush) ? "push " : "pull ") << std::setw(3)
383 << entry.numElements << " elements, last " << entry.last << std::endl;
384 }
385 }
386
387 } // namespace chre
388
389 // If this test fails it's likely due to thread interleaving, so consider
390 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
391 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,SynchronizedConcurrencyStress)392 TEST(AtomicSpscQueueStressTest, SynchronizedConcurrencyStress) {
393 constexpr size_t kCapacity = 512;
394 constexpr int64_t kMaxCount = 2000 * kCapacity;
395 AtomicSpscQueue<int64_t, kCapacity> q;
396
397 std::mutex m;
398 std::condition_variable cv;
399
400 // Guarded by mutex m
401 ArrayQueue<HistoryEntry, kHistorySize> history;
402 int64_t totalOps = 0;
403
404 auto lfsr = []() {
405 // 9-bit LFSR with feedback polynomial x^9 + x^5 + 1 gives us a
406 // pseudo-random sequence over all 511 possible values
407 static uint16_t lfsr = 1;
408 uint16_t nextBit = ((lfsr << 8) ^ (lfsr << 4)) & 0x100;
409 lfsr = nextBit | (lfsr >> 1);
410
411 return lfsr;
412 };
413 bool pending = false;
414
415 auto p = q.producer();
416 std::thread producerThread = std::thread([&]() {
417 int64_t count = 0;
418 while (count <= kMaxCount) {
419 // Push in a pseudo-random number of elements into the queue, then notify
420 // the consumer; yield if we can't push it all at once
421 uint16_t pushCount = lfsr();
422 while (p.capacity() - p.size() < pushCount) {
423 std::this_thread::yield();
424 }
425
426 for (int i = 0; i < pushCount; i++) {
427 p.push(count++);
428 if (count > kMaxCount) {
429 break;
430 }
431 }
432
433 m.lock();
434 history.kick_push(HistoryEntry(Op::kPush, pushCount, count - 1));
435 totalOps++;
436 pending = true;
437 m.unlock();
438 cv.notify_one();
439 }
440 });
441
442 auto c = q.consumer();
443 std::thread consumerThread = std::thread([&]() {
444 int64_t last = -1;
445 size_t extracted = 0;
446 FixedSizeVector<int64_t, kCapacity> myBuf;
447 while (last < kMaxCount) {
448 {
449 std::unique_lock<std::mutex> lock(m);
450 if (last != -1) {
451 history.kick_push(HistoryEntry(Op::kPull, extracted, last));
452 totalOps++;
453 }
454 while (c.empty() && !pending) {
455 cv.wait(lock);
456 if (pending) {
457 pending = false;
458 break;
459 }
460 }
461 }
462
463 extracted = c.extract(&myBuf);
464 EXPECT_LE(extracted, kCapacity);
465 for (int i = 0; i < extracted; i++) {
466 int64_t next = myBuf[i];
467 if (last != -1 && last + 1 != next) {
468 std::lock_guard<std::mutex> lock(m);
469 EXPECT_EQ(last + 1, next)
470 << "After pulling " << extracted << " elements, value at offset "
471 << i << " is incorrect: expected " << (last + 1) << " but got "
472 << next << "." << std::endl
473 << testing::PrintToString(history)
474 // totalOps + 1 because this call to extract() isn't counted yet
475 << " Total operations since start: " << (totalOps + 1)
476 << std::endl
477 << "Note: most recent push may not be included in the history, "
478 << "most recent pull is definitely not included (but indicated "
479 << "in the first sentence above)." << std::endl;
480 // The history is unlikely to have the most recent push operation
481 // because the consumer thread runs freely until it tries to acquire
482 // the mutex to add to the history. In other words, it may have pushed
483 // any time between after we unblock from wait() and reach here, but
484 // hasn't added it to the history yet.
485 }
486 last = next;
487 }
488 myBuf.resize(0);
489 }
490 });
491
492 producerThread.join();
493 consumerThread.join();
494
495 EXPECT_EQ(0, q.size());
496 }
497