1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <input/BlockingQueue.h>
18
19 #include <gtest/gtest.h>
20 #include <thread>
21
22 namespace android {
23
24 using std::chrono_literals::operator""ns;
25
26 // --- BlockingQueueTest ---
27
28 /**
29 * Validate basic pop and push operation.
30 */
TEST(BlockingQueueTest,Queue_AddAndRemove)31 TEST(BlockingQueueTest, Queue_AddAndRemove) {
32 constexpr size_t capacity = 10;
33 BlockingQueue<int> queue(capacity);
34
35 ASSERT_TRUE(queue.push(1));
36 ASSERT_EQ(queue.pop(), 1);
37
38 ASSERT_TRUE(queue.emplace(2));
39 ASSERT_EQ(queue.popWithTimeout(0ns), 2);
40
41 ASSERT_TRUE(queue.push(3));
42 ASSERT_EQ(queue.popWithTimeout(100ns), 3);
43
44 ASSERT_EQ(std::nullopt, queue.popWithTimeout(0ns));
45 }
46
47 /**
48 * Make sure the queue has strict capacity limits.
49 */
TEST(BlockingQueueTest,Queue_ReachesCapacity)50 TEST(BlockingQueueTest, Queue_ReachesCapacity) {
51 constexpr size_t capacity = 3;
52 BlockingQueue<int> queue(capacity);
53
54 // First 3 elements should be added successfully
55 ASSERT_TRUE(queue.push(1));
56 ASSERT_TRUE(queue.push(2));
57 ASSERT_TRUE(queue.push(3));
58 ASSERT_FALSE(queue.push(4)) << "Queue should reach capacity at size " << capacity;
59 }
60
61 /**
62 * Make sure the queue maintains FIFO order.
63 * Add elements and remove them, and check the order.
64 */
TEST(BlockingQueueTest,Queue_isFIFO)65 TEST(BlockingQueueTest, Queue_isFIFO) {
66 constexpr size_t capacity = 10;
67 BlockingQueue<int> queue(capacity);
68
69 for (size_t i = 0; i < capacity; i++) {
70 ASSERT_TRUE(queue.push(static_cast<int>(i)));
71 }
72 for (size_t i = 0; i < capacity; i++) {
73 ASSERT_EQ(queue.pop(), static_cast<int>(i));
74 }
75 }
76
TEST(BlockingQueueTest,Queue_Clears)77 TEST(BlockingQueueTest, Queue_Clears) {
78 constexpr size_t capacity = 2;
79 BlockingQueue<int> queue(capacity);
80
81 queue.push(1);
82 queue.push(2);
83 queue.clear();
84 queue.push(3);
85 // Should no longer receive elements 1 and 2
86 ASSERT_EQ(3, queue.pop());
87 }
88
TEST(BlockingQueueTest,Queue_Erases)89 TEST(BlockingQueueTest, Queue_Erases) {
90 constexpr size_t capacity = 4;
91 BlockingQueue<int> queue(capacity);
92
93 queue.push(1);
94 queue.push(2);
95 queue.push(3);
96 queue.push(4);
97 // Erase elements 2 and 4
98 queue.erase_if([](int element) { return element == 2 || element == 4; });
99 // Should no longer receive elements 2 and 4
100 ASSERT_EQ(1, queue.pop());
101 ASSERT_EQ(3, queue.pop());
102 }
103
104 // --- BlockingQueueTest - Multiple threads ---
105
TEST(BlockingQueueTest,Queue_AllowsMultipleThreads)106 TEST(BlockingQueueTest, Queue_AllowsMultipleThreads) {
107 constexpr size_t capacity = 100; // large capacity to increase likelihood that threads overlap
108 BlockingQueue<int> queue(capacity);
109
110 // Fill queue from a different thread
111 std::thread fillQueue([&queue]() {
112 for (size_t i = 0; i < capacity; i++) {
113 ASSERT_TRUE(queue.push(static_cast<int>(i)));
114 }
115 });
116
117 // Make sure all elements are received in correct order
118 for (size_t i = 0; i < capacity; i++) {
119 ASSERT_EQ(queue.pop(), static_cast<int>(i));
120 }
121
122 fillQueue.join();
123 }
124
125 /**
126 * When the queue has no elements, and pop is called, it should block
127 * the current thread until an element is added to the queue (from another thread).
128 * Here we create a separate thread and call pop on an empty queue. Next,
129 * we check that the thread is blocked.
130 */
TEST(BlockingQueueTest,Queue_BlocksWhileWaitingForElements)131 TEST(BlockingQueueTest, Queue_BlocksWhileWaitingForElements) {
132 constexpr size_t capacity = 1;
133 BlockingQueue<int> queue(capacity);
134
135 std::atomic_bool hasReceivedElement = false;
136
137 // fill queue from a different thread
138 std::thread waitUntilHasElements([&queue, &hasReceivedElement]() {
139 queue.pop(); // This should block until an element has been added
140 hasReceivedElement = true;
141 });
142
143 ASSERT_FALSE(hasReceivedElement);
144 queue.push(1);
145 waitUntilHasElements.join();
146 ASSERT_TRUE(hasReceivedElement);
147 }
148
TEST(BlockingQueueTest,Queue_TimesOut)149 TEST(BlockingQueueTest, Queue_TimesOut) {
150 BlockingQueue<int> queue;
151 ASSERT_EQ(std::nullopt, queue.popWithTimeout(1ns));
152 }
153
154 } // namespace android
155