1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/queue.h"
18
19 #include <sys/eventfd.h>
20
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29
30 using namespace std::chrono_literals;
31
32 namespace bluetooth {
33 namespace os {
34 namespace {
35
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40
41 class QueueTest : public ::testing::Test {
42 protected:
SetUp()43 void SetUp() override {
44 enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45 enqueue_handler_ = new Handler(enqueue_thread_);
46 dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47 dequeue_handler_ = new Handler(dequeue_thread_);
48 }
TearDown()49 void TearDown() override {
50 enqueue_handler_->Clear();
51 delete enqueue_handler_;
52 delete enqueue_thread_;
53 dequeue_handler_->Clear();
54 delete dequeue_handler_;
55 delete dequeue_thread_;
56 enqueue_handler_ = nullptr;
57 enqueue_thread_ = nullptr;
58 dequeue_handler_ = nullptr;
59 dequeue_thread_ = nullptr;
60 }
61
62 Thread* enqueue_thread_;
63 Handler* enqueue_handler_;
64 Thread* dequeue_thread_;
65 Handler* dequeue_handler_;
66
sync_enqueue_handler()67 void sync_enqueue_handler() {
68 log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr");
69 log::assert_that(
70 enqueue_thread_->GetReactor()->WaitForIdle(2s),
71 "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)");
72 }
73 };
74
75 class TestEnqueueEnd {
76 public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)77 explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
78 : count(0), handler_(handler), queue_(queue), delay_(0) {}
79
~TestEnqueueEnd()80 ~TestEnqueueEnd() {}
81
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)82 void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
83 promise_map_ = promise_map;
84 handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
85 }
86
UnregisterEnqueue()87 void UnregisterEnqueue() {
88 std::promise<void> promise;
89 auto future = promise.get_future();
90
91 handler_->Post(
92 common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
93 future.wait();
94 }
95
EnqueueCallbackForTest()96 std::unique_ptr<std::string> EnqueueCallbackForTest() {
97 if (delay_ != 0) {
98 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
99 }
100
101 count++;
102 std::unique_ptr<std::string> data = std::move(buffer_.front());
103 buffer_.pop();
104 std::string copy = *data;
105 if (buffer_.empty()) {
106 queue_->UnregisterEnqueue();
107 }
108
109 auto key = buffer_.size();
110 auto node = promise_map_->extract(key);
111 if (node) {
112 node.mapped().set_value(key);
113 }
114
115 return data;
116 }
117
setDelay(int value)118 void setDelay(int value) {
119 delay_ = value;
120 }
121
122 std::queue<std::unique_ptr<std::string>> buffer_;
123 int count;
124
125 private:
126 Handler* handler_;
127 Queue<std::string>* queue_;
128 std::unordered_map<int, std::promise<int>>* promise_map_;
129 int delay_;
130
handle_register_enqueue()131 void handle_register_enqueue() {
132 queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
133 }
134
handle_unregister_enqueue(std::promise<void> promise)135 void handle_unregister_enqueue(std::promise<void> promise) {
136 queue_->UnregisterEnqueue();
137 promise.set_value();
138 }
139 };
140
141 class TestDequeueEnd {
142 public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)143 explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
144 : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
145
~TestDequeueEnd()146 ~TestDequeueEnd() {}
147
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)148 void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
149 promise_map_ = promise_map;
150 handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
151 }
152
UnregisterDequeue()153 void UnregisterDequeue() {
154 std::promise<void> promise;
155 auto future = promise.get_future();
156
157 handler_->Post(
158 common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
159 future.wait();
160 }
161
DequeueCallbackForTest()162 void DequeueCallbackForTest() {
163 if (delay_ != 0) {
164 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
165 }
166
167 count++;
168 std::unique_ptr<std::string> data = queue_->TryDequeue();
169 buffer_.push(std::move(data));
170
171 if (buffer_.size() == (size_t)capacity_) {
172 queue_->UnregisterDequeue();
173 }
174
175 auto key = buffer_.size();
176 auto node = promise_map_->extract(key);
177 if (node) {
178 node.mapped().set_value(key);
179 }
180 }
181
setDelay(int value)182 void setDelay(int value) {
183 delay_ = value;
184 }
185
186 std::queue<std::unique_ptr<std::string>> buffer_;
187 int count;
188
189 private:
190 Handler* handler_;
191 Queue<std::string>* queue_;
192 std::unordered_map<int, std::promise<int>>* promise_map_;
193 int capacity_;
194 int delay_;
195
handle_register_dequeue()196 void handle_register_dequeue() {
197 queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
198 }
199
handle_unregister_dequeue(std::promise<void> promise)200 void handle_unregister_dequeue(std::promise<void> promise) {
201 queue_->UnregisterDequeue();
202 promise.set_value();
203 }
204 };
205
206 // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full
207 // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty
208
209 // Test 1 : Queue is empty
210
211 // Enqueue end level : 1
212 // Dequeue end level : 0
213 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)214 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
215 Queue<std::string> queue(kQueueSize);
216 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
217
218 // Push kQueueSize data to enqueue_end buffer
219 for (int i = 0; i < kQueueSize; i++) {
220 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
221 test_enqueue_end.buffer_.push(std::move(data));
222 }
223 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
224
225 // Register enqueue and expect data move to Queue
226 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
227 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
228 auto enqueue_future = enqueue_promise_map[0].get_future();
229 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
230 enqueue_future.wait();
231 EXPECT_EQ(enqueue_future.get(), 0);
232 std::this_thread::sleep_for(std::chrono::milliseconds(20));
233 }
234
235 // Enqueue end level : 1
236 // Dequeue end level : 0
237 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)238 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
239 Queue<std::string> queue(kQueueSize);
240 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
241
242 // Register dequeue, DequeueCallback shouldn't be invoked
243 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
244 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
245 std::this_thread::sleep_for(std::chrono::milliseconds(20));
246 EXPECT_EQ(test_dequeue_end.count, 0);
247
248 test_dequeue_end.UnregisterDequeue();
249 }
250
251 // Test 2 : Queue is full
252
253 // Enqueue end level : 0
254 // Dequeue end level : 1
255 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)256 TEST_F(QueueTest, register_enqueue_with_full_queue) {
257 Queue<std::string> queue(kQueueSize);
258 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
259
260 // make Queue full
261 for (int i = 0; i < kQueueSize; i++) {
262 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
263 test_enqueue_end.buffer_.push(std::move(data));
264 }
265 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
266 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
267 auto enqueue_future = enqueue_promise_map[0].get_future();
268 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
269 enqueue_future.wait();
270 EXPECT_EQ(enqueue_future.get(), 0);
271
272 // push some data to enqueue_end buffer and register enqueue;
273 for (int i = 0; i < kHalfOfQueueSize; i++) {
274 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
275 test_enqueue_end.buffer_.push(std::move(data));
276 }
277 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
278
279 // EnqueueCallback shouldn't be invoked
280 std::this_thread::sleep_for(std::chrono::milliseconds(20));
281 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
282 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
283
284 test_enqueue_end.UnregisterEnqueue();
285 }
286
287 // Enqueue end level : 0
288 // Dequeue end level : 1
289 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)290 TEST_F(QueueTest, register_dequeue_with_full_queue) {
291 Queue<std::string> queue(kQueueSize);
292 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
293 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
294
295 // make Queue full
296 for (int i = 0; i < kQueueSize; i++) {
297 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
298 test_enqueue_end.buffer_.push(std::move(data));
299 }
300 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
301 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
302 auto enqueue_future = enqueue_promise_map[0].get_future();
303 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
304 enqueue_future.wait();
305 EXPECT_EQ(enqueue_future.get(), 0);
306
307 // Register dequeue and expect data move to dequeue end buffer
308 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
309 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
310 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
311 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
312 dequeue_future.wait();
313 EXPECT_EQ(dequeue_future.get(), kQueueSize);
314
315 test_dequeue_end.UnregisterDequeue();
316 }
317
318 // Test 3 : Queue is non-empty and non-full
319
320 // Enqueue end level : 1
321 // Dequeue end level : 1
322 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)323 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
324 Queue<std::string> queue(kQueueSize);
325 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
326
327 // make Queue half empty
328 for (int i = 0; i < kHalfOfQueueSize; i++) {
329 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
330 test_enqueue_end.buffer_.push(std::move(data));
331 }
332 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
333 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
334 auto enqueue_future = enqueue_promise_map[0].get_future();
335 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
336 enqueue_future.wait();
337 EXPECT_EQ(enqueue_future.get(), 0);
338
339 // push some data to enqueue_end buffer and register enqueue;
340 for (int i = 0; i < kHalfOfQueueSize; i++) {
341 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
342 test_enqueue_end.buffer_.push(std::move(data));
343 }
344
345 // Register enqueue and expect data move to Queue
346 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
347 enqueue_future = enqueue_promise_map[0].get_future();
348 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
349 enqueue_future.wait();
350 EXPECT_EQ(enqueue_future.get(), 0);
351 sync_enqueue_handler();
352 }
353
354 // Enqueue end level : 1
355 // Dequeue end level : 1
356 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)357 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
358 Queue<std::string> queue(kQueueSize);
359 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
360 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
361
362 // make Queue half empty
363 for (int i = 0; i < kHalfOfQueueSize; i++) {
364 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
365 test_enqueue_end.buffer_.push(std::move(data));
366 }
367 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
368 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
369 auto enqueue_future = enqueue_promise_map[0].get_future();
370 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
371 enqueue_future.wait();
372 EXPECT_EQ(enqueue_future.get(), 0);
373
374 // Register dequeue and expect data move to dequeue end buffer
375 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
376 dequeue_promise_map.emplace(
377 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
378 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
379 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
380 dequeue_future.wait();
381 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
382
383 test_dequeue_end.UnregisterDequeue();
384 }
385
386 // Dynamic level test
387
388 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
389
390 // Enqueue end level : 1 -> 0
391 // Dequeue end level : 1
392 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)393 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
394 Queue<std::string> queue(kQueueSize);
395 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
396
397 // push double of kQueueSize to enqueue end buffer
398 for (int i = 0; i < kDoubleOfQueueSize; i++) {
399 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
400 test_enqueue_end.buffer_.push(std::move(data));
401 }
402
403 // Register enqueue and expect kQueueSize data move to Queue
404 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
405 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
406 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
407 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
408 enqueue_future.wait();
409 EXPECT_EQ(enqueue_future.get(), kQueueSize);
410
411 // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
412 std::this_thread::sleep_for(std::chrono::milliseconds(20));
413 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
414 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
415
416 test_enqueue_end.UnregisterEnqueue();
417 }
418
419 // Enqueue end level : 1 -> 0
420 // Dequeue end level : 1
421 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)422 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
423 Queue<std::string> queue(kQueueSize);
424 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
425 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
426
427 // push double of kQueueSize to enqueue end buffer
428 for (int i = 0; i < kDoubleOfQueueSize; i++) {
429 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
430 test_enqueue_end.buffer_.push(std::move(data));
431 }
432
433 // Register dequeue
434 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
435 dequeue_promise_map.emplace(
436 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
437 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
438 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
439
440 // Register enqueue
441 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
442 enqueue_promise_map.emplace(
443 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
444 auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
445 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
446
447 // Dequeue end will unregister when buffer size is kHalfOfQueueSize
448 dequeue_future.wait();
449 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
450
451 // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
452 enqueue_future.wait();
453 EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
454 std::this_thread::sleep_for(std::chrono::milliseconds(20));
455 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
456 EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
457
458 test_enqueue_end.UnregisterEnqueue();
459 }
460
461 // Enqueue end level : 1 -> 0
462 // Dequeue end level : 1
463 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)464 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
465 Queue<std::string> queue(kQueueSize);
466 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
467 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
468
469 // push double of kDoubleOfQueueSize to enqueue end buffer
470 for (int i = 0; i < kDoubleOfQueueSize; i++) {
471 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
472 test_enqueue_end.buffer_.push(std::move(data));
473 }
474
475 // Set 20 ms delay for callback and register dequeue
476 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
477 test_dequeue_end.setDelay(20);
478 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
479 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
480
481 // Register enqueue
482 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
483 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
484 auto enqueue_future = enqueue_promise_map[0].get_future();
485 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
486
487 // Wait for enqueue buffer empty and expect queue is full
488 enqueue_future.wait();
489 EXPECT_EQ(enqueue_future.get(), 0);
490 EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
491
492 test_dequeue_end.UnregisterDequeue();
493 }
494
495 // Enqueue end level : 0 -> 1
496 // Dequeue end level : 1 -> 0
497 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)498 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
499 Queue<std::string> queue(kQueueSizeOne);
500 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
501 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
502
503 // push double of kQueueSize to enqueue end buffer
504 for (int i = 0; i < kQueueSize; i++) {
505 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
506 test_enqueue_end.buffer_.push(std::move(data));
507 }
508
509 // Register dequeue
510 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
511 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
512 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
513 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
514
515 // Register enqueue
516 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
517 auto enqueue_future = enqueue_promise_map[0].get_future();
518 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
519
520 // Wait for all data move from enqueue end buffer to dequeue end buffer
521 dequeue_future.wait();
522 EXPECT_EQ(dequeue_future.get(), kQueueSize);
523
524 test_dequeue_end.UnregisterDequeue();
525 }
526
527 // Enqueue end level : 1 -> 0
528 // Dequeue end level : 1
529 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)530 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
531 Queue<std::string> queue(kQueueSize);
532 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
533 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
534
535 // make Queue full
536 for (int i = 0; i < kDoubleOfQueueSize; i++) {
537 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
538 test_enqueue_end.buffer_.push(std::move(data));
539 }
540 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
541 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
542 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
543 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
544 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
545 enqueue_future.wait();
546 EXPECT_EQ(enqueue_future.get(), kQueueSize);
547
548 // Expect kQueueSize data block in enqueue end buffer
549 std::this_thread::sleep_for(std::chrono::milliseconds(20));
550 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
551
552 // Register dequeue
553 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
554 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
555
556 // Expect enqueue end will empty
557 enqueue_future = enqueue_promise_map[0].get_future();
558 enqueue_future.wait();
559 EXPECT_EQ(enqueue_future.get(), 0);
560
561 test_dequeue_end.UnregisterDequeue();
562 }
563
564 // Enqueue end level : 0 -> 1
565 // Dequeue end level : 1 -> 0
566 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)567 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
568 Queue<std::string> queue(kQueueSizeOne);
569 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
570 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
571
572 // push double of kQueueSize to enqueue end buffer
573 for (int i = 0; i < kQueueSize; i++) {
574 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
575 test_enqueue_end.buffer_.push(std::move(data));
576 }
577
578 // Register dequeue
579 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
580 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
581 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
582 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
583
584 // Register enqueue
585 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
586 auto enqueue_future = enqueue_promise_map[0].get_future();
587 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
588
589 // Wait for all data move from enqueue end buffer to dequeue end buffer
590 dequeue_future.wait();
591 EXPECT_EQ(dequeue_future.get(), kQueueSize);
592
593 test_dequeue_end.UnregisterDequeue();
594 }
595
596 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
597
598 // Enqueue end level : 1
599 // Dequeue end level : 1 -> 0
600 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)601 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
602 Queue<std::string> queue(kQueueSize);
603 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
604 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
605
606 // make Queue half empty
607 for (int i = 0; i < kHalfOfQueueSize; i++) {
608 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
609 test_enqueue_end.buffer_.push(std::move(data));
610 }
611 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
612 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
613 auto enqueue_future = enqueue_promise_map[0].get_future();
614 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
615 enqueue_future.wait();
616 EXPECT_EQ(enqueue_future.get(), 0);
617
618 // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
619 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
620 dequeue_promise_map.emplace(
621 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
622 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
623 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
624 dequeue_future.wait();
625 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
626
627 // Expect DequeueCallback should stop to be invoked
628 std::this_thread::sleep_for(std::chrono::milliseconds(20));
629 EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
630 }
631
632 // Enqueue end level : 1
633 // Dequeue end level : 1 -> 0
634 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)635 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
636 Queue<std::string> queue(kQueueSize);
637 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
638 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
639
640 // make Queue half empty
641 for (int i = 0; i < kHalfOfQueueSize; i++) {
642 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
643 test_enqueue_end.buffer_.push(std::move(data));
644 }
645 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
646 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
647 auto enqueue_future = enqueue_promise_map[0].get_future();
648 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
649 enqueue_future.wait();
650 EXPECT_EQ(enqueue_future.get(), 0);
651
652 // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
653 for (int i = 0; i < kHalfOfQueueSize; i++) {
654 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
655 test_enqueue_end.buffer_.push(std::move(data));
656 }
657 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
658
659 // Register dequeue, expect kQueueSize move to dequeue end buffer
660 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
661 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
662 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
663 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
664 dequeue_future.wait();
665 EXPECT_EQ(dequeue_future.get(), kQueueSize);
666
667 // Expect DequeueCallback should stop to be invoked
668 std::this_thread::sleep_for(std::chrono::milliseconds(20));
669 EXPECT_EQ(test_dequeue_end.count, kQueueSize);
670 }
671
672 // Enqueue end level : 1
673 // Dequeue end level : 0 -> 1
674 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)675 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
676 Queue<std::string> queue(kQueueSize);
677 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
678 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
679
680 // Register dequeue
681 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
682 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
683 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
684
685 // push kQueueSize data to enqueue end buffer and register enqueue
686 for (int i = 0; i < kQueueSize; i++) {
687 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
688 test_enqueue_end.buffer_.push(std::move(data));
689 }
690 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
691 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
692
693 // Expect kQueueSize data move to dequeue end buffer
694 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
695 dequeue_future.wait();
696 EXPECT_EQ(dequeue_future.get(), kQueueSize);
697 }
698
TEST_F(QueueTest,pass_smart_pointer_and_unregister)699 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
700 Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
701
702 // Enqueue a string
703 std::string valid = "Valid String";
704 std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
705 queue->RegisterEnqueue(
706 enqueue_handler_,
707 common::Bind(
708 [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
709 queue->UnregisterEnqueue();
710 return std::make_unique<std::string>(*shared);
711 },
712 common::Unretained(queue),
713 shared));
714
715 // Dequeue the string
716 queue->RegisterDequeue(
717 dequeue_handler_,
718 common::Bind(
719 [](Queue<std::string>* queue, std::string valid) {
720 queue->UnregisterDequeue();
721 auto answer = *queue->TryDequeue();
722 ASSERT_EQ(answer, valid);
723 },
724 common::Unretained(queue),
725 valid));
726
727 // Wait for both handlers to finish and delete the Queue
728 std::promise<void> promise;
729 auto future = promise.get_future();
730
731 enqueue_handler_->Post(common::BindOnce(
732 [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
733 dequeue_handler->Post(common::BindOnce(
734 [](Queue<std::string>* queue, std::promise<void>* promise) {
735 delete queue;
736 promise->set_value();
737 },
738 common::Unretained(queue),
739 common::Unretained(promise)));
740 },
741 common::Unretained(dequeue_handler_),
742 common::Unretained(queue),
743 common::Unretained(&promise)));
744 future.wait();
745 }
746
sleep_and_enqueue_callback(int * to_increase)747 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
748 std::this_thread::sleep_for(std::chrono::milliseconds(100));
749 (*to_increase)++;
750 return std::make_unique<std::string>("Hello");
751 }
752
TEST_F(QueueTest,unregister_enqueue_and_wait)753 TEST_F(QueueTest, unregister_enqueue_and_wait) {
754 Queue<std::string> queue(10);
755 int* indicator = new int(100);
756 queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
757 std::this_thread::sleep_for(std::chrono::milliseconds(50));
758 queue.UnregisterEnqueue();
759 EXPECT_EQ(*indicator, 101);
760 delete indicator;
761 }
762
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)763 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
764 int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
765 std::this_thread::sleep_for(std::chrono::milliseconds(100));
766 (*to_increase)++;
767 if (is_registered->exchange(false)) {
768 queue->UnregisterEnqueue();
769 }
770 return std::make_unique<std::string>("Hello");
771 }
772
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)773 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
774 Queue<std::string> queue(10);
775 int* indicator = new int(100);
776 std::atomic_bool is_registered = true;
777 queue.RegisterEnqueue(
778 enqueue_handler_,
779 common::Bind(
780 &sleep_and_enqueue_callback_and_unregister,
781 common::Unretained(indicator),
782 common::Unretained(&queue),
783 common::Unretained(&is_registered)));
784 std::this_thread::sleep_for(std::chrono::milliseconds(50));
785 if (is_registered.exchange(false)) {
786 queue.UnregisterEnqueue();
787 }
788 EXPECT_EQ(*indicator, 101);
789 delete indicator;
790 }
791
sleep_and_dequeue_callback(int * to_increase)792 void sleep_and_dequeue_callback(int* to_increase) {
793 std::this_thread::sleep_for(std::chrono::milliseconds(100));
794 (*to_increase)++;
795 }
796
TEST_F(QueueTest,unregister_dequeue_and_wait)797 TEST_F(QueueTest, unregister_dequeue_and_wait) {
798 int* indicator = new int(100);
799 Queue<std::string> queue(10);
800 queue.RegisterEnqueue(
801 enqueue_handler_,
802 common::Bind(
803 [](Queue<std::string>* queue) {
804 queue->UnregisterEnqueue();
805 return std::make_unique<std::string>("Hello");
806 },
807 common::Unretained(&queue)));
808 queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
809 std::this_thread::sleep_for(std::chrono::milliseconds(50));
810 queue.UnregisterDequeue();
811 EXPECT_EQ(*indicator, 101);
812 delete indicator;
813 }
814
815 // Create all threads for death tests in the function that dies
816 class QueueDeathTest : public ::testing::Test {
817 public:
RegisterEnqueueAndDelete()818 void RegisterEnqueueAndDelete() {
819 Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
820 Handler* enqueue_handler = new Handler(enqueue_thread);
821 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
822 queue->RegisterEnqueue(
823 enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
824 delete queue;
825 }
826
RegisterDequeueAndDelete()827 void RegisterDequeueAndDelete() {
828 Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
829 Handler* dequeue_handler = new Handler(dequeue_thread);
830 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
831 queue->RegisterDequeue(
832 dequeue_handler,
833 common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
834 delete queue;
835 }
836 };
837
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)838 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
839 EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
840 }
841
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)842 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
843 EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
844 }
845
846 class MockIQueueEnqueue : public IQueueEnqueue<int> {
847 public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)848 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
849 EXPECT_FALSE(registered_);
850 registered_ = true;
851 handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
852 }
853
handle_register_enqueue(EnqueueCallback callback)854 void handle_register_enqueue(EnqueueCallback callback) {
855 if (dont_handle_register_enqueue_) {
856 return;
857 }
858 while (registered_) {
859 std::unique_ptr<int> front = callback.Run();
860 queue_.push(*front);
861 }
862 }
863
UnregisterEnqueue()864 void UnregisterEnqueue() override {
865 EXPECT_TRUE(registered_);
866 registered_ = false;
867 }
868
869 bool dont_handle_register_enqueue_ = false;
870 bool registered_ = false;
871 std::queue<int> queue_;
872 };
873
874 class EnqueueBufferTest : public ::testing::Test {
875 protected:
SetUp()876 void SetUp() override {
877 thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
878 handler_ = new Handler(thread_);
879 }
880
TearDown()881 void TearDown() override {
882 handler_->Clear();
883 delete handler_;
884 delete thread_;
885 }
886
SynchronizeHandler()887 void SynchronizeHandler() {
888 std::promise<void> promise;
889 auto future = promise.get_future();
890 handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
891 future.wait();
892 }
893
894 MockIQueueEnqueue enqueue_;
895 EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
896 Thread* thread_;
897 Handler* handler_;
898 };
899
TEST_F(EnqueueBufferTest,enqueue)900 TEST_F(EnqueueBufferTest, enqueue) {
901 int num_items = 10;
902 for (int i = 0; i < num_items; i++) {
903 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
904 }
905 SynchronizeHandler();
906 for (int i = 0; i < num_items; i++) {
907 ASSERT_EQ(enqueue_.queue_.front(), i);
908 enqueue_.queue_.pop();
909 }
910 ASSERT_FALSE(enqueue_.registered_);
911 }
912
TEST_F(EnqueueBufferTest,clear)913 TEST_F(EnqueueBufferTest, clear) {
914 enqueue_.dont_handle_register_enqueue_ = true;
915 int num_items = 10;
916 for (int i = 0; i < num_items; i++) {
917 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
918 }
919 ASSERT_TRUE(enqueue_.registered_);
920 enqueue_buffer_.Clear();
921 ASSERT_FALSE(enqueue_.registered_);
922 }
923
TEST_F(EnqueueBufferTest,delete_when_in_callback)924 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
925 Queue<int>* queue = new Queue<int>(kQueueSize);
926 EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
927 int num_items = 10;
928 for (int i = 0; i < num_items; i++) {
929 enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
930 }
931
932 delete enqueue_buffer;
933 delete queue;
934 }
935
936 } // namespace
937 } // namespace os
938 } // namespace bluetooth
939