1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "chre/util/system/atomic_spsc_queue.h"
18 #include "chre/util/array_queue.h"
19 #include "gtest/gtest.h"
20 
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 
25 using chre::ArrayQueue;
26 using chre::AtomicSpscQueue;
27 using chre::FixedSizeVector;
28 
29 namespace {
30 
31 constexpr int kMaxTestCapacity = 10;
32 int destructor_count[kMaxTestCapacity];
33 int constructor_count;
34 int total_destructor_count;
35 
36 class FakeElement {
37  public:
FakeElement()38   FakeElement() {
39     constructor_count++;
40   };
FakeElement(int i)41   FakeElement(int i) {
42     val_ = i;
43     constructor_count++;
44   };
~FakeElement()45   ~FakeElement() {
46     total_destructor_count++;
47     if (val_ >= 0 && val_ < kMaxTestCapacity) {
48       destructor_count[val_]++;
49     }
50   };
setValue(int i)51   void setValue(int i) {
52     val_ = i;
53   }
54 
55  private:
56   int val_ = kMaxTestCapacity - 1;
57 };
58 
59 }  // namespace
60 
TEST(AtomicSpscQueueTest,IsEmptyInitially)61 TEST(AtomicSpscQueueTest, IsEmptyInitially) {
62   AtomicSpscQueue<int, 4> q;
63   EXPECT_EQ(4, q.capacity());
64   EXPECT_TRUE(q.consumer().empty());
65   EXPECT_EQ(0, q.consumer().size());
66   EXPECT_EQ(0, q.producer().size());
67   EXPECT_EQ(0, q.size());
68 }
69 
TEST(AtomicSpscQueueTest,SimplePushPop)70 TEST(AtomicSpscQueueTest, SimplePushPop) {
71   AtomicSpscQueue<int, 3> q;
72   q.producer().push(1);
73   q.producer().push(2);
74   EXPECT_EQ(q.consumer().front(), 1);
75   EXPECT_FALSE(q.producer().full());
76   q.consumer().pop();
77   q.producer().push(3);
78   EXPECT_EQ(q.consumer().front(), 2);
79   q.consumer().pop();
80   EXPECT_EQ(q.consumer().front(), 3);
81 }
82 
TEST(AtomicSpscQueueTest,TestSize)83 TEST(AtomicSpscQueueTest, TestSize) {
84   AtomicSpscQueue<int, 2> q;
85   EXPECT_EQ(0, q.size());
86   q.producer().push(1);
87   EXPECT_EQ(1, q.size());
88   q.producer().push(2);
89   EXPECT_EQ(2, q.size());
90   q.consumer().pop();
91   EXPECT_EQ(1, q.size());
92   q.consumer().pop();
93   EXPECT_EQ(0, q.size());
94 }
95 
TEST(AtomicSpscQueueTest,TestFront)96 TEST(AtomicSpscQueueTest, TestFront) {
97   AtomicSpscQueue<int, 3> q;
98   q.producer().emplace(1);
99   EXPECT_EQ(1, q.consumer().front());
100   q.consumer().pop();
101   q.producer().emplace(2);
102   EXPECT_EQ(2, q.consumer().front());
103   q.producer().emplace(3);
104   EXPECT_EQ(2, q.consumer().front());
105 }
106 
TEST(AtomicSpscQueueTest,DestructorCalledOnPop)107 TEST(AtomicSpscQueueTest, DestructorCalledOnPop) {
108   for (size_t i = 0; i < kMaxTestCapacity; ++i) {
109     destructor_count[i] = 0;
110   }
111 
112   AtomicSpscQueue<FakeElement, 3> q;
113   FakeElement e;
114   q.producer().push(e);
115   q.producer().push(e);
116 
117   q.consumer().front().setValue(0);
118   q.consumer().pop();
119   EXPECT_EQ(1, destructor_count[0]);
120 
121   q.consumer().front().setValue(1);
122   q.consumer().pop();
123   EXPECT_EQ(1, destructor_count[1]);
124 }
125 
TEST(AtomicSpscQueueTest,ElementsDestructedWhenQueueDestructed)126 TEST(AtomicSpscQueueTest, ElementsDestructedWhenQueueDestructed) {
127   for (size_t i = 0; i < kMaxTestCapacity; ++i) {
128     destructor_count[i] = 0;
129   }
130 
131   {
132     AtomicSpscQueue<FakeElement, 4> q;
133 
134     for (size_t i = 0; i < 3; ++i) {
135       q.producer().emplace(i);
136     }
137   }
138 
139   for (size_t i = 0; i < 3; ++i) {
140     EXPECT_EQ(1, destructor_count[i]);
141   }
142 
143   EXPECT_EQ(0, destructor_count[3]);
144 }
145 
TEST(AtomicSpscQueueTest,ExtractFull)146 TEST(AtomicSpscQueueTest, ExtractFull) {
147   constexpr size_t kSize = 16;
148   AtomicSpscQueue<int32_t, kSize> q;
149 
150   for (int32_t i = 0; i < kSize; i++) {
151     q.producer().push(i);
152   }
153 
154   int32_t dest[kSize + 1];
155   memset(dest, 0, sizeof(dest));
156   dest[kSize] = 0xdeadbeef;
157   size_t extracted = q.consumer().extract(dest, kSize);
158   EXPECT_EQ(extracted, kSize);
159   for (int32_t i = 0; i < kSize; i++) {
160     EXPECT_EQ(dest[i], i);
161   }
162   EXPECT_EQ(0xdeadbeef, dest[kSize]);
163 }
164 
TEST(AtomicSpscQueueTest,ExtractPartial)165 TEST(AtomicSpscQueueTest, ExtractPartial) {
166   constexpr size_t kSize = 16;
167   AtomicSpscQueue<int32_t, kSize> q;
168 
169   for (int32_t i = 0; i < kSize / 2; i++) {
170     q.producer().push(i);
171   }
172 
173   int32_t dest[kSize + 1];
174   memset(dest, 0, sizeof(dest));
175   size_t extracted = q.consumer().extract(dest, kSize / 4);
176   EXPECT_EQ(extracted, kSize / 4);
177   for (int32_t i = 0; i < kSize / 4; i++) {
178     EXPECT_EQ(dest[i], i);
179   }
180   EXPECT_EQ(0, dest[kSize / 4]);
181   EXPECT_EQ(kSize / 4, q.size());
182 
183   extracted = q.consumer().extract(&dest[kSize / 4], kSize / 4);
184   EXPECT_EQ(extracted, kSize / 4);
185   for (int32_t i = kSize / 4; i < kSize / 2; i++) {
186     EXPECT_EQ(dest[i], i);
187   }
188   EXPECT_EQ(0, dest[kSize]);
189   EXPECT_TRUE(q.consumer().empty());
190 
191   q.producer().push(0xd00d);
192   EXPECT_EQ(0xd00d, q.consumer().front());
193   q.consumer().pop();
194   EXPECT_TRUE(q.consumer().empty());
195 }
196 
TEST(AtomicSpscQueueTest,ExtractWraparound)197 TEST(AtomicSpscQueueTest, ExtractWraparound) {
198   constexpr size_t kSize = 16;
199   AtomicSpscQueue<int32_t, kSize> q;
200   auto p = q.producer();
201   auto c = q.consumer();
202 
203   for (int32_t i = 0; i < kSize; i++) {
204     p.push(i);
205   }
206 
207   for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
208     c.pop();
209     p.push(i);
210   }
211 
212   // Now two copies will be needed to extract the data
213   int32_t dest[kSize + 1];
214   memset(dest, 0, sizeof(dest));
215   dest[kSize] = 0xdeadbeef;
216 
217   // Pull all except 1
218   size_t extracted = c.extract(dest, kSize - 1);
219   EXPECT_EQ(extracted, kSize - 1);
220 
221   // And now the last one (asking for more than we expect to get)
222   EXPECT_EQ(1, q.size());
223   extracted = c.extract(&dest[kSize - 1], 2);
224   EXPECT_EQ(extracted, 1);
225 
226   for (int32_t i = 0; i < kSize; i++) {
227     EXPECT_EQ(dest[i], i + kSize / 2);
228   }
229   EXPECT_EQ(0xdeadbeef, dest[kSize]);
230 }
231 
TEST(AtomicSpscQueueTest,PopWraparound)232 TEST(AtomicSpscQueueTest, PopWraparound) {
233   constexpr size_t kSize = 16;
234   AtomicSpscQueue<int32_t, kSize> q;
235   auto p = q.producer();
236   auto c = q.consumer();
237 
238   for (int32_t i = 0; i < kSize; i++) {
239     p.push(i);
240   }
241 
242   for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
243     EXPECT_EQ(c.front(), i - kSize);
244     c.pop();
245     p.push(i);
246   }
247 
248   for (int32_t i = kSize / 2; i < kSize + kSize / 2; i++) {
249     EXPECT_EQ(c.front(), i);
250     c.pop();
251   }
252 }
253 
TEST(AtomicSpscQueueTest,ExtractVector)254 TEST(AtomicSpscQueueTest, ExtractVector) {
255   constexpr size_t kSize = 8;
256   AtomicSpscQueue<int, kSize> q;
257 
258   auto p = q.producer();
259   for (int i = 0; i < kSize; i++) {
260     p.push(i);
261   }
262 
263   auto c = q.consumer();
264   constexpr size_t kExtraSpace = 2;
265   static_assert(kSize > kExtraSpace + 2, "Test assumption broken");
266   FixedSizeVector<int, kSize + kExtraSpace> v;
267 
268   // Output size dependent on elements available in queue
269   size_t extracted = c.extract(&v);
270   EXPECT_EQ(extracted, kSize);
271   EXPECT_EQ(kSize, v.size());
272   for (int i = 0; i < kSize; i++) {
273     EXPECT_EQ(v[i], i);
274   }
275 
276   for (int i = kSize; i < kSize + kExtraSpace; i++) {
277     p.push(i);
278   }
279   p.push(1337);
280   p.push(42);
281 
282   // Output size dependent on space available in vector
283   extracted = c.extract(&v);
284   EXPECT_EQ(extracted, kExtraSpace);
285   EXPECT_EQ(v.capacity(), v.size());
286   for (int i = 0; i < kSize + kExtraSpace; i++) {
287     EXPECT_EQ(v[i], i);
288   }
289   EXPECT_EQ(2, q.size());
290 
291   // Output size 0 (no space left in vector)
292   extracted = c.extract(&v);
293   EXPECT_EQ(0, extracted);
294   EXPECT_EQ(2, q.size());
295 
296   // Extract into reset vector
297   v.resize(0);
298   extracted = c.extract(&v);
299   EXPECT_EQ(2, extracted);
300   EXPECT_EQ(2, v.size());
301   EXPECT_EQ(v[0], 1337);
302   EXPECT_EQ(v[1], 42);
303 
304   // Output size 0 (no elements left in queue)
305   EXPECT_TRUE(q.consumer().empty());
306   extracted = c.extract(&v);
307   EXPECT_EQ(0, extracted);
308 }
309 
310 // If this test fails it's likely due to thread interleaving, so consider
311 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
312 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,ConcurrencyStress)313 TEST(AtomicSpscQueueStressTest, ConcurrencyStress) {
314   constexpr size_t kCapacity = 2048;
315   constexpr int64_t kMaxCount = 100 * kCapacity;
316   AtomicSpscQueue<int64_t, kCapacity> q;
317 
318   auto producer = q.producer();
319   std::thread producerThread = std::thread(
320       [](decltype(producer) p) {
321         int64_t count = 0;
322         while (count <= kMaxCount) {
323           if (p.full()) {
324             // Give the other thread a chance to be scheduled
325             std::this_thread::yield();
326             continue;
327           }
328 
329           p.push(count++);
330         }
331       },
332       producer);
333 
334   auto consumer = q.consumer();
335   std::thread consumerThread = std::thread(
336       [](decltype(consumer) c) {
337         int64_t last = -1;
338         do {
339           if (c.empty()) {
340             std::this_thread::yield();
341             continue;
342           }
343           int64_t next = c.front();
344           if (last != -1) {
345             EXPECT_EQ(last + 1, next);
346           }
347           last = next;
348           c.pop();
349         } while (last < kMaxCount);
350       },
351       consumer);
352 
353   producerThread.join();
354   consumerThread.join();
355 
356   EXPECT_EQ(0, q.size());
357 }
358 
359 // Helpers for SynchronizedConcurrencyStress
360 enum class Op {
361   kPush = 0,
362   kPull = 1,
363 };
364 struct HistoryEntry {
365   Op op;
366   int numElements;
367   int64_t last;
368 
369   HistoryEntry() = default;
HistoryEntryHistoryEntry370   HistoryEntry(Op op_, int numElements_, int64_t last_)
371       : op(op_), numElements(numElements_), last(last_) {}
372 };
373 
374 constexpr size_t kHistorySize = 512;
375 
376 namespace chre {  // (PrintTo needs to be in the same namespace as ArrayQueue)
377 
PrintTo(const ArrayQueue<HistoryEntry,kHistorySize> & history,std::ostream * os)378 void PrintTo(const ArrayQueue<HistoryEntry, kHistorySize> &history,
379              std::ostream *os) {
380   *os << "Dumping history from oldest to newest:" << std::endl;
381   for (const HistoryEntry &entry : history) {
382     *os << "  " << ((entry.op == Op::kPush) ? "push " : "pull ") << std::setw(3)
383         << entry.numElements << " elements, last " << entry.last << std::endl;
384   }
385 }
386 
387 }  // namespace chre
388 
389 // If this test fails it's likely due to thread interleaving, so consider
390 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
391 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,SynchronizedConcurrencyStress)392 TEST(AtomicSpscQueueStressTest, SynchronizedConcurrencyStress) {
393   constexpr size_t kCapacity = 512;
394   constexpr int64_t kMaxCount = 2000 * kCapacity;
395   AtomicSpscQueue<int64_t, kCapacity> q;
396 
397   std::mutex m;
398   std::condition_variable cv;
399 
400   // Guarded by mutex m
401   ArrayQueue<HistoryEntry, kHistorySize> history;
402   int64_t totalOps = 0;
403 
404   auto lfsr = []() {
405     // 9-bit LFSR with feedback polynomial x^9 + x^5 + 1 gives us a
406     // pseudo-random sequence over all 511 possible values
407     static uint16_t lfsr = 1;
408     uint16_t nextBit = ((lfsr << 8) ^ (lfsr << 4)) & 0x100;
409     lfsr = nextBit | (lfsr >> 1);
410 
411     return lfsr;
412   };
413   bool pending = false;
414 
415   auto p = q.producer();
416   std::thread producerThread = std::thread([&]() {
417     int64_t count = 0;
418     while (count <= kMaxCount) {
419       // Push in a pseudo-random number of elements into the queue, then notify
420       // the consumer; yield if we can't push it all at once
421       uint16_t pushCount = lfsr();
422       while (p.capacity() - p.size() < pushCount) {
423         std::this_thread::yield();
424       }
425 
426       for (int i = 0; i < pushCount; i++) {
427         p.push(count++);
428         if (count > kMaxCount) {
429           break;
430         }
431       }
432 
433       m.lock();
434       history.kick_push(HistoryEntry(Op::kPush, pushCount, count - 1));
435       totalOps++;
436       pending = true;
437       m.unlock();
438       cv.notify_one();
439     }
440   });
441 
442   auto c = q.consumer();
443   std::thread consumerThread = std::thread([&]() {
444     int64_t last = -1;
445     size_t extracted = 0;
446     FixedSizeVector<int64_t, kCapacity> myBuf;
447     while (last < kMaxCount) {
448       {
449         std::unique_lock<std::mutex> lock(m);
450         if (last != -1) {
451           history.kick_push(HistoryEntry(Op::kPull, extracted, last));
452           totalOps++;
453         }
454         while (c.empty() && !pending) {
455           cv.wait(lock);
456           if (pending) {
457             pending = false;
458             break;
459           }
460         }
461       }
462 
463       extracted = c.extract(&myBuf);
464       EXPECT_LE(extracted, kCapacity);
465       for (int i = 0; i < extracted; i++) {
466         int64_t next = myBuf[i];
467         if (last != -1 && last + 1 != next) {
468           std::lock_guard<std::mutex> lock(m);
469           EXPECT_EQ(last + 1, next)
470               << "After pulling " << extracted << " elements, value at offset "
471               << i << " is incorrect: expected " << (last + 1) << " but got "
472               << next << "." << std::endl
473               << testing::PrintToString(history)
474               // totalOps + 1 because this call to extract() isn't counted yet
475               << " Total operations since start: " << (totalOps + 1)
476               << std::endl
477               << "Note: most recent push may not be included in the history, "
478               << "most recent pull is definitely not included (but indicated "
479               << "in the first sentence above)." << std::endl;
480           // The history is unlikely to have the most recent push operation
481           // because the consumer thread runs freely until it tries to acquire
482           // the mutex to add to the history. In other words, it may have pushed
483           // any time between after we unblock from wait() and reach here, but
484           // hasn't added it to the history yet.
485         }
486         last = next;
487       }
488       myBuf.resize(0);
489     }
490   });
491 
492   producerThread.join();
493   consumerThread.join();
494 
495   EXPECT_EQ(0, q.size());
496 }
497