1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/queue.h"
18 
19 #include <sys/eventfd.h>
20 
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25 
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29 
30 using namespace std::chrono_literals;
31 
32 namespace bluetooth {
33 namespace os {
34 namespace {
35 
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40 
41 class QueueTest : public ::testing::Test {
42  protected:
SetUp()43   void SetUp() override {
44     enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45     enqueue_handler_ = new Handler(enqueue_thread_);
46     dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47     dequeue_handler_ = new Handler(dequeue_thread_);
48   }
TearDown()49   void TearDown() override {
50     enqueue_handler_->Clear();
51     delete enqueue_handler_;
52     delete enqueue_thread_;
53     dequeue_handler_->Clear();
54     delete dequeue_handler_;
55     delete dequeue_thread_;
56     enqueue_handler_ = nullptr;
57     enqueue_thread_ = nullptr;
58     dequeue_handler_ = nullptr;
59     dequeue_thread_ = nullptr;
60   }
61 
62   Thread* enqueue_thread_;
63   Handler* enqueue_handler_;
64   Thread* dequeue_thread_;
65   Handler* dequeue_handler_;
66 
sync_enqueue_handler()67   void sync_enqueue_handler() {
68     log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr");
69     log::assert_that(
70         enqueue_thread_->GetReactor()->WaitForIdle(2s),
71         "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)");
72   }
73 };
74 
75 class TestEnqueueEnd {
76  public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)77   explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
78       : count(0), handler_(handler), queue_(queue), delay_(0) {}
79 
~TestEnqueueEnd()80   ~TestEnqueueEnd() {}
81 
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)82   void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
83     promise_map_ = promise_map;
84     handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
85   }
86 
UnregisterEnqueue()87   void UnregisterEnqueue() {
88     std::promise<void> promise;
89     auto future = promise.get_future();
90 
91     handler_->Post(
92         common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
93     future.wait();
94   }
95 
EnqueueCallbackForTest()96   std::unique_ptr<std::string> EnqueueCallbackForTest() {
97     if (delay_ != 0) {
98       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
99     }
100 
101     count++;
102     std::unique_ptr<std::string> data = std::move(buffer_.front());
103     buffer_.pop();
104     std::string copy = *data;
105     if (buffer_.empty()) {
106       queue_->UnregisterEnqueue();
107     }
108 
109     auto key = buffer_.size();
110     auto node = promise_map_->extract(key);
111     if (node) {
112       node.mapped().set_value(key);
113     }
114 
115     return data;
116   }
117 
setDelay(int value)118   void setDelay(int value) {
119     delay_ = value;
120   }
121 
122   std::queue<std::unique_ptr<std::string>> buffer_;
123   int count;
124 
125  private:
126   Handler* handler_;
127   Queue<std::string>* queue_;
128   std::unordered_map<int, std::promise<int>>* promise_map_;
129   int delay_;
130 
handle_register_enqueue()131   void handle_register_enqueue() {
132     queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
133   }
134 
handle_unregister_enqueue(std::promise<void> promise)135   void handle_unregister_enqueue(std::promise<void> promise) {
136     queue_->UnregisterEnqueue();
137     promise.set_value();
138   }
139 };
140 
141 class TestDequeueEnd {
142  public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)143   explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
144       : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
145 
~TestDequeueEnd()146   ~TestDequeueEnd() {}
147 
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)148   void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
149     promise_map_ = promise_map;
150     handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
151   }
152 
UnregisterDequeue()153   void UnregisterDequeue() {
154     std::promise<void> promise;
155     auto future = promise.get_future();
156 
157     handler_->Post(
158         common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
159     future.wait();
160   }
161 
DequeueCallbackForTest()162   void DequeueCallbackForTest() {
163     if (delay_ != 0) {
164       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
165     }
166 
167     count++;
168     std::unique_ptr<std::string> data = queue_->TryDequeue();
169     buffer_.push(std::move(data));
170 
171     if (buffer_.size() == (size_t)capacity_) {
172       queue_->UnregisterDequeue();
173     }
174 
175     auto key = buffer_.size();
176     auto node = promise_map_->extract(key);
177     if (node) {
178       node.mapped().set_value(key);
179     }
180   }
181 
setDelay(int value)182   void setDelay(int value) {
183     delay_ = value;
184   }
185 
186   std::queue<std::unique_ptr<std::string>> buffer_;
187   int count;
188 
189  private:
190   Handler* handler_;
191   Queue<std::string>* queue_;
192   std::unordered_map<int, std::promise<int>>* promise_map_;
193   int capacity_;
194   int delay_;
195 
handle_register_dequeue()196   void handle_register_dequeue() {
197     queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
198   }
199 
handle_unregister_dequeue(std::promise<void> promise)200   void handle_unregister_dequeue(std::promise<void> promise) {
201     queue_->UnregisterDequeue();
202     promise.set_value();
203   }
204 };
205 
206 // Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
207 // Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty
208 
209 // Test 1 : Queue is empty
210 
211 // Enqueue end level : 1
212 // Dequeue end level : 0
213 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)214 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
215   Queue<std::string> queue(kQueueSize);
216   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
217 
218   // Push kQueueSize data to enqueue_end buffer
219   for (int i = 0; i < kQueueSize; i++) {
220     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
221     test_enqueue_end.buffer_.push(std::move(data));
222   }
223   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
224 
225   // Register enqueue and expect data move to Queue
226   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
227   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
228   auto enqueue_future = enqueue_promise_map[0].get_future();
229   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
230   enqueue_future.wait();
231   EXPECT_EQ(enqueue_future.get(), 0);
232   std::this_thread::sleep_for(std::chrono::milliseconds(20));
233 }
234 
235 // Enqueue end level : 1
236 // Dequeue end level : 0
237 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)238 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
239   Queue<std::string> queue(kQueueSize);
240   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
241 
242   // Register dequeue, DequeueCallback shouldn't be invoked
243   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
244   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
245   std::this_thread::sleep_for(std::chrono::milliseconds(20));
246   EXPECT_EQ(test_dequeue_end.count, 0);
247 
248   test_dequeue_end.UnregisterDequeue();
249 }
250 
251 // Test 2 : Queue is full
252 
253 // Enqueue end level : 0
254 // Dequeue end level : 1
255 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)256 TEST_F(QueueTest, register_enqueue_with_full_queue) {
257   Queue<std::string> queue(kQueueSize);
258   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
259 
260   // make Queue full
261   for (int i = 0; i < kQueueSize; i++) {
262     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
263     test_enqueue_end.buffer_.push(std::move(data));
264   }
265   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
266   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
267   auto enqueue_future = enqueue_promise_map[0].get_future();
268   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
269   enqueue_future.wait();
270   EXPECT_EQ(enqueue_future.get(), 0);
271 
272   // push some data to enqueue_end buffer and register enqueue;
273   for (int i = 0; i < kHalfOfQueueSize; i++) {
274     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
275     test_enqueue_end.buffer_.push(std::move(data));
276   }
277   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
278 
279   // EnqueueCallback shouldn't be invoked
280   std::this_thread::sleep_for(std::chrono::milliseconds(20));
281   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
282   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
283 
284   test_enqueue_end.UnregisterEnqueue();
285 }
286 
287 // Enqueue end level : 0
288 // Dequeue end level : 1
289 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)290 TEST_F(QueueTest, register_dequeue_with_full_queue) {
291   Queue<std::string> queue(kQueueSize);
292   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
293   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
294 
295   // make Queue full
296   for (int i = 0; i < kQueueSize; i++) {
297     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
298     test_enqueue_end.buffer_.push(std::move(data));
299   }
300   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
301   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
302   auto enqueue_future = enqueue_promise_map[0].get_future();
303   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
304   enqueue_future.wait();
305   EXPECT_EQ(enqueue_future.get(), 0);
306 
307   // Register dequeue and expect data move to dequeue end buffer
308   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
309   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
310   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
311   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
312   dequeue_future.wait();
313   EXPECT_EQ(dequeue_future.get(), kQueueSize);
314 
315   test_dequeue_end.UnregisterDequeue();
316 }
317 
318 // Test 3 : Queue is non-empty and non-full
319 
320 // Enqueue end level : 1
321 // Dequeue end level : 1
322 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)323 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
324   Queue<std::string> queue(kQueueSize);
325   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
326 
327   // make Queue half empty
328   for (int i = 0; i < kHalfOfQueueSize; i++) {
329     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
330     test_enqueue_end.buffer_.push(std::move(data));
331   }
332   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
333   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
334   auto enqueue_future = enqueue_promise_map[0].get_future();
335   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
336   enqueue_future.wait();
337   EXPECT_EQ(enqueue_future.get(), 0);
338 
339   // push some data to enqueue_end buffer and register enqueue;
340   for (int i = 0; i < kHalfOfQueueSize; i++) {
341     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
342     test_enqueue_end.buffer_.push(std::move(data));
343   }
344 
345   // Register enqueue and expect data move to Queue
346   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
347   enqueue_future = enqueue_promise_map[0].get_future();
348   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
349   enqueue_future.wait();
350   EXPECT_EQ(enqueue_future.get(), 0);
351   sync_enqueue_handler();
352 }
353 
354 // Enqueue end level : 1
355 // Dequeue end level : 1
356 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)357 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
358   Queue<std::string> queue(kQueueSize);
359   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
360   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
361 
362   // make Queue half empty
363   for (int i = 0; i < kHalfOfQueueSize; i++) {
364     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
365     test_enqueue_end.buffer_.push(std::move(data));
366   }
367   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
368   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
369   auto enqueue_future = enqueue_promise_map[0].get_future();
370   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
371   enqueue_future.wait();
372   EXPECT_EQ(enqueue_future.get(), 0);
373 
374   // Register dequeue and expect data move to dequeue end buffer
375   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
376   dequeue_promise_map.emplace(
377       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
378   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
379   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
380   dequeue_future.wait();
381   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
382 
383   test_dequeue_end.UnregisterDequeue();
384 }
385 
386 // Dynamic level test
387 
388 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
389 
390 // Enqueue end level : 1 -> 0
391 // Dequeue end level : 1
392 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)393 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
394   Queue<std::string> queue(kQueueSize);
395   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
396 
397   // push double of kQueueSize to enqueue end buffer
398   for (int i = 0; i < kDoubleOfQueueSize; i++) {
399     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
400     test_enqueue_end.buffer_.push(std::move(data));
401   }
402 
403   // Register enqueue and expect kQueueSize data move to Queue
404   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
405   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
406   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
407   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
408   enqueue_future.wait();
409   EXPECT_EQ(enqueue_future.get(), kQueueSize);
410 
411   // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
412   std::this_thread::sleep_for(std::chrono::milliseconds(20));
413   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
414   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
415 
416   test_enqueue_end.UnregisterEnqueue();
417 }
418 
419 // Enqueue end level : 1 -> 0
420 // Dequeue end level : 1
421 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)422 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
423   Queue<std::string> queue(kQueueSize);
424   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
425   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
426 
427   // push double of kQueueSize to enqueue end buffer
428   for (int i = 0; i < kDoubleOfQueueSize; i++) {
429     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
430     test_enqueue_end.buffer_.push(std::move(data));
431   }
432 
433   // Register dequeue
434   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
435   dequeue_promise_map.emplace(
436       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
437   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
438   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
439 
440   // Register enqueue
441   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
442   enqueue_promise_map.emplace(
443       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
444   auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
445   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
446 
447   // Dequeue end will unregister when buffer size is kHalfOfQueueSize
448   dequeue_future.wait();
449   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
450 
451   // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
452   enqueue_future.wait();
453   EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
454   std::this_thread::sleep_for(std::chrono::milliseconds(20));
455   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
456   EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
457 
458   test_enqueue_end.UnregisterEnqueue();
459 }
460 
461 // Enqueue end level : 1 -> 0
462 // Dequeue end level : 1
463 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)464 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
465   Queue<std::string> queue(kQueueSize);
466   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
467   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
468 
469   // push double of kDoubleOfQueueSize to enqueue end buffer
470   for (int i = 0; i < kDoubleOfQueueSize; i++) {
471     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
472     test_enqueue_end.buffer_.push(std::move(data));
473   }
474 
475   // Set 20 ms delay for callback and register dequeue
476   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
477   test_dequeue_end.setDelay(20);
478   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
479   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
480 
481   // Register enqueue
482   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
483   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
484   auto enqueue_future = enqueue_promise_map[0].get_future();
485   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
486 
487   // Wait for enqueue buffer empty and expect queue is full
488   enqueue_future.wait();
489   EXPECT_EQ(enqueue_future.get(), 0);
490   EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
491 
492   test_dequeue_end.UnregisterDequeue();
493 }
494 
495 // Enqueue end level : 0 -> 1
496 // Dequeue end level : 1 -> 0
497 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)498 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
499   Queue<std::string> queue(kQueueSizeOne);
500   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
501   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
502 
503   // push double of kQueueSize to enqueue end buffer
504   for (int i = 0; i < kQueueSize; i++) {
505     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
506     test_enqueue_end.buffer_.push(std::move(data));
507   }
508 
509   // Register dequeue
510   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
511   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
512   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
513   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
514 
515   // Register enqueue
516   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
517   auto enqueue_future = enqueue_promise_map[0].get_future();
518   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
519 
520   // Wait for all data move from enqueue end buffer to dequeue end buffer
521   dequeue_future.wait();
522   EXPECT_EQ(dequeue_future.get(), kQueueSize);
523 
524   test_dequeue_end.UnregisterDequeue();
525 }
526 
527 // Enqueue end level : 1 -> 0
528 // Dequeue end level : 1
529 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)530 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
531   Queue<std::string> queue(kQueueSize);
532   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
533   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
534 
535   // make Queue full
536   for (int i = 0; i < kDoubleOfQueueSize; i++) {
537     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
538     test_enqueue_end.buffer_.push(std::move(data));
539   }
540   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
541   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
542   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
543   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
544   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
545   enqueue_future.wait();
546   EXPECT_EQ(enqueue_future.get(), kQueueSize);
547 
548   // Expect kQueueSize data block in enqueue end buffer
549   std::this_thread::sleep_for(std::chrono::milliseconds(20));
550   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
551 
552   // Register dequeue
553   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
554   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
555 
556   // Expect enqueue end will empty
557   enqueue_future = enqueue_promise_map[0].get_future();
558   enqueue_future.wait();
559   EXPECT_EQ(enqueue_future.get(), 0);
560 
561   test_dequeue_end.UnregisterDequeue();
562 }
563 
564 // Enqueue end level : 0 -> 1
565 // Dequeue end level : 1 -> 0
566 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)567 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
568   Queue<std::string> queue(kQueueSizeOne);
569   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
570   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
571 
572   // push double of kQueueSize to enqueue end buffer
573   for (int i = 0; i < kQueueSize; i++) {
574     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
575     test_enqueue_end.buffer_.push(std::move(data));
576   }
577 
578   // Register dequeue
579   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
580   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
581   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
582   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
583 
584   // Register enqueue
585   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
586   auto enqueue_future = enqueue_promise_map[0].get_future();
587   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
588 
589   // Wait for all data move from enqueue end buffer to dequeue end buffer
590   dequeue_future.wait();
591   EXPECT_EQ(dequeue_future.get(), kQueueSize);
592 
593   test_dequeue_end.UnregisterDequeue();
594 }
595 
596 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
597 
598 // Enqueue end level : 1
599 // Dequeue end level : 1 -> 0
600 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)601 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
602   Queue<std::string> queue(kQueueSize);
603   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
604   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
605 
606   // make Queue half empty
607   for (int i = 0; i < kHalfOfQueueSize; i++) {
608     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
609     test_enqueue_end.buffer_.push(std::move(data));
610   }
611   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
612   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
613   auto enqueue_future = enqueue_promise_map[0].get_future();
614   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
615   enqueue_future.wait();
616   EXPECT_EQ(enqueue_future.get(), 0);
617 
618   // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
619   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
620   dequeue_promise_map.emplace(
621       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
622   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
623   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
624   dequeue_future.wait();
625   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
626 
627   // Expect DequeueCallback should stop to be invoked
628   std::this_thread::sleep_for(std::chrono::milliseconds(20));
629   EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
630 }
631 
632 // Enqueue end level : 1
633 // Dequeue end level : 1 -> 0
634 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)635 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
636   Queue<std::string> queue(kQueueSize);
637   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
638   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
639 
640   // make Queue half empty
641   for (int i = 0; i < kHalfOfQueueSize; i++) {
642     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
643     test_enqueue_end.buffer_.push(std::move(data));
644   }
645   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
646   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
647   auto enqueue_future = enqueue_promise_map[0].get_future();
648   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
649   enqueue_future.wait();
650   EXPECT_EQ(enqueue_future.get(), 0);
651 
652   // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
653   for (int i = 0; i < kHalfOfQueueSize; i++) {
654     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
655     test_enqueue_end.buffer_.push(std::move(data));
656   }
657   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
658 
659   // Register dequeue, expect kQueueSize move to dequeue end buffer
660   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
661   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
662   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
663   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
664   dequeue_future.wait();
665   EXPECT_EQ(dequeue_future.get(), kQueueSize);
666 
667   // Expect DequeueCallback should stop to be invoked
668   std::this_thread::sleep_for(std::chrono::milliseconds(20));
669   EXPECT_EQ(test_dequeue_end.count, kQueueSize);
670 }
671 
672 // Enqueue end level : 1
673 // Dequeue end level : 0 -> 1
674 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)675 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
676   Queue<std::string> queue(kQueueSize);
677   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
678   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
679 
680   // Register dequeue
681   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
682   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
683   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
684 
685   // push kQueueSize data to enqueue end buffer and register enqueue
686   for (int i = 0; i < kQueueSize; i++) {
687     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
688     test_enqueue_end.buffer_.push(std::move(data));
689   }
690   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
691   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
692 
693   // Expect kQueueSize data move to dequeue end buffer
694   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
695   dequeue_future.wait();
696   EXPECT_EQ(dequeue_future.get(), kQueueSize);
697 }
698 
TEST_F(QueueTest,pass_smart_pointer_and_unregister)699 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
700   Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
701 
702   // Enqueue a string
703   std::string valid = "Valid String";
704   std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
705   queue->RegisterEnqueue(
706       enqueue_handler_,
707       common::Bind(
708           [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
709             queue->UnregisterEnqueue();
710             return std::make_unique<std::string>(*shared);
711           },
712           common::Unretained(queue),
713           shared));
714 
715   // Dequeue the string
716   queue->RegisterDequeue(
717       dequeue_handler_,
718       common::Bind(
719           [](Queue<std::string>* queue, std::string valid) {
720             queue->UnregisterDequeue();
721             auto answer = *queue->TryDequeue();
722             ASSERT_EQ(answer, valid);
723           },
724           common::Unretained(queue),
725           valid));
726 
727   // Wait for both handlers to finish and delete the Queue
728   std::promise<void> promise;
729   auto future = promise.get_future();
730 
731   enqueue_handler_->Post(common::BindOnce(
732       [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
733         dequeue_handler->Post(common::BindOnce(
734             [](Queue<std::string>* queue, std::promise<void>* promise) {
735               delete queue;
736               promise->set_value();
737             },
738             common::Unretained(queue),
739             common::Unretained(promise)));
740       },
741       common::Unretained(dequeue_handler_),
742       common::Unretained(queue),
743       common::Unretained(&promise)));
744   future.wait();
745 }
746 
sleep_and_enqueue_callback(int * to_increase)747 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
748   std::this_thread::sleep_for(std::chrono::milliseconds(100));
749   (*to_increase)++;
750   return std::make_unique<std::string>("Hello");
751 }
752 
TEST_F(QueueTest,unregister_enqueue_and_wait)753 TEST_F(QueueTest, unregister_enqueue_and_wait) {
754   Queue<std::string> queue(10);
755   int* indicator = new int(100);
756   queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
757   std::this_thread::sleep_for(std::chrono::milliseconds(50));
758   queue.UnregisterEnqueue();
759   EXPECT_EQ(*indicator, 101);
760   delete indicator;
761 }
762 
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)763 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
764     int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
765   std::this_thread::sleep_for(std::chrono::milliseconds(100));
766   (*to_increase)++;
767   if (is_registered->exchange(false)) {
768     queue->UnregisterEnqueue();
769   }
770   return std::make_unique<std::string>("Hello");
771 }
772 
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)773 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
774   Queue<std::string> queue(10);
775   int* indicator = new int(100);
776   std::atomic_bool is_registered = true;
777   queue.RegisterEnqueue(
778       enqueue_handler_,
779       common::Bind(
780           &sleep_and_enqueue_callback_and_unregister,
781           common::Unretained(indicator),
782           common::Unretained(&queue),
783           common::Unretained(&is_registered)));
784   std::this_thread::sleep_for(std::chrono::milliseconds(50));
785   if (is_registered.exchange(false)) {
786     queue.UnregisterEnqueue();
787   }
788   EXPECT_EQ(*indicator, 101);
789   delete indicator;
790 }
791 
sleep_and_dequeue_callback(int * to_increase)792 void sleep_and_dequeue_callback(int* to_increase) {
793   std::this_thread::sleep_for(std::chrono::milliseconds(100));
794   (*to_increase)++;
795 }
796 
TEST_F(QueueTest,unregister_dequeue_and_wait)797 TEST_F(QueueTest, unregister_dequeue_and_wait) {
798   int* indicator = new int(100);
799   Queue<std::string> queue(10);
800   queue.RegisterEnqueue(
801       enqueue_handler_,
802       common::Bind(
803           [](Queue<std::string>* queue) {
804             queue->UnregisterEnqueue();
805             return std::make_unique<std::string>("Hello");
806           },
807           common::Unretained(&queue)));
808   queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
809   std::this_thread::sleep_for(std::chrono::milliseconds(50));
810   queue.UnregisterDequeue();
811   EXPECT_EQ(*indicator, 101);
812   delete indicator;
813 }
814 
815 // Create all threads for death tests in the function that dies
816 class QueueDeathTest : public ::testing::Test {
817  public:
RegisterEnqueueAndDelete()818   void RegisterEnqueueAndDelete() {
819     Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
820     Handler* enqueue_handler = new Handler(enqueue_thread);
821     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
822     queue->RegisterEnqueue(
823         enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
824     delete queue;
825   }
826 
RegisterDequeueAndDelete()827   void RegisterDequeueAndDelete() {
828     Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
829     Handler* dequeue_handler = new Handler(dequeue_thread);
830     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
831     queue->RegisterDequeue(
832         dequeue_handler,
833         common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
834     delete queue;
835   }
836 };
837 
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)838 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
839   EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
840 }
841 
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)842 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
843   EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
844 }
845 
846 class MockIQueueEnqueue : public IQueueEnqueue<int> {
847  public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)848   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
849     EXPECT_FALSE(registered_);
850     registered_ = true;
851     handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
852   }
853 
handle_register_enqueue(EnqueueCallback callback)854   void handle_register_enqueue(EnqueueCallback callback) {
855     if (dont_handle_register_enqueue_) {
856       return;
857     }
858     while (registered_) {
859       std::unique_ptr<int> front = callback.Run();
860       queue_.push(*front);
861     }
862   }
863 
UnregisterEnqueue()864   void UnregisterEnqueue() override {
865     EXPECT_TRUE(registered_);
866     registered_ = false;
867   }
868 
869   bool dont_handle_register_enqueue_ = false;
870   bool registered_ = false;
871   std::queue<int> queue_;
872 };
873 
874 class EnqueueBufferTest : public ::testing::Test {
875  protected:
SetUp()876   void SetUp() override {
877     thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
878     handler_ = new Handler(thread_);
879   }
880 
TearDown()881   void TearDown() override {
882     handler_->Clear();
883     delete handler_;
884     delete thread_;
885   }
886 
SynchronizeHandler()887   void SynchronizeHandler() {
888     std::promise<void> promise;
889     auto future = promise.get_future();
890     handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
891     future.wait();
892   }
893 
894   MockIQueueEnqueue enqueue_;
895   EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
896   Thread* thread_;
897   Handler* handler_;
898 };
899 
TEST_F(EnqueueBufferTest,enqueue)900 TEST_F(EnqueueBufferTest, enqueue) {
901   int num_items = 10;
902   for (int i = 0; i < num_items; i++) {
903     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
904   }
905   SynchronizeHandler();
906   for (int i = 0; i < num_items; i++) {
907     ASSERT_EQ(enqueue_.queue_.front(), i);
908     enqueue_.queue_.pop();
909   }
910   ASSERT_FALSE(enqueue_.registered_);
911 }
912 
TEST_F(EnqueueBufferTest,clear)913 TEST_F(EnqueueBufferTest, clear) {
914   enqueue_.dont_handle_register_enqueue_ = true;
915   int num_items = 10;
916   for (int i = 0; i < num_items; i++) {
917     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
918   }
919   ASSERT_TRUE(enqueue_.registered_);
920   enqueue_buffer_.Clear();
921   ASSERT_FALSE(enqueue_.registered_);
922 }
923 
TEST_F(EnqueueBufferTest,delete_when_in_callback)924 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
925   Queue<int>* queue = new Queue<int>(kQueueSize);
926   EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
927   int num_items = 10;
928   for (int i = 0; i < num_items; i++) {
929     enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
930   }
931 
932   delete enqueue_buffer;
933   delete queue;
934 }
935 
936 }  // namespace
937 }  // namespace os
938 }  // namespace bluetooth
939