/* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "os/queue.h" #include #include #include #include #include #include "common/bind.h" #include "gtest/gtest.h" #include "os/reactor.h" using namespace std::chrono_literals; namespace bluetooth { namespace os { namespace { constexpr int kQueueSize = 10; constexpr int kHalfOfQueueSize = kQueueSize / 2; constexpr int kDoubleOfQueueSize = kQueueSize * 2; constexpr int kQueueSizeOne = 1; class QueueTest : public ::testing::Test { protected: void SetUp() override { enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL); enqueue_handler_ = new Handler(enqueue_thread_); dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL); dequeue_handler_ = new Handler(dequeue_thread_); } void TearDown() override { enqueue_handler_->Clear(); delete enqueue_handler_; delete enqueue_thread_; dequeue_handler_->Clear(); delete dequeue_handler_; delete dequeue_thread_; enqueue_handler_ = nullptr; enqueue_thread_ = nullptr; dequeue_handler_ = nullptr; dequeue_thread_ = nullptr; } Thread* enqueue_thread_; Handler* enqueue_handler_; Thread* dequeue_thread_; Handler* dequeue_handler_; void sync_enqueue_handler() { log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr"); log::assert_that( enqueue_thread_->GetReactor()->WaitForIdle(2s), "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)"); } }; class TestEnqueueEnd { public: explicit TestEnqueueEnd(Queue* queue, Handler* handler) : count(0), handler_(handler), queue_(queue), delay_(0) {} ~TestEnqueueEnd() {} void RegisterEnqueue(std::unordered_map>* promise_map) { promise_map_ = promise_map; handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this))); } void UnregisterEnqueue() { std::promise promise; auto future = promise.get_future(); handler_->Post( common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise))); future.wait(); } std::unique_ptr EnqueueCallbackForTest() { if (delay_ != 0) { std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); } count++; std::unique_ptr data = std::move(buffer_.front()); buffer_.pop(); std::string copy = *data; if (buffer_.empty()) { queue_->UnregisterEnqueue(); } auto key = buffer_.size(); auto node = promise_map_->extract(key); if (node) { node.mapped().set_value(key); } return data; } void setDelay(int value) { delay_ = value; } std::queue> buffer_; int count; private: Handler* handler_; Queue* queue_; std::unordered_map>* promise_map_; int delay_; void handle_register_enqueue() { queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this))); } void handle_unregister_enqueue(std::promise promise) { queue_->UnregisterEnqueue(); promise.set_value(); } }; class TestDequeueEnd { public: explicit TestDequeueEnd(Queue* queue, Handler* handler, int capacity) : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {} ~TestDequeueEnd() {} void RegisterDequeue(std::unordered_map>* promise_map) { promise_map_ = promise_map; handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this))); } void UnregisterDequeue() { std::promise promise; auto future = promise.get_future(); handler_->Post( common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise))); future.wait(); } void DequeueCallbackForTest() { if (delay_ != 0) { std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); } count++; std::unique_ptr data = queue_->TryDequeue(); buffer_.push(std::move(data)); if (buffer_.size() == (size_t)capacity_) { queue_->UnregisterDequeue(); } auto key = buffer_.size(); auto node = promise_map_->extract(key); if (node) { node.mapped().set_value(key); } } void setDelay(int value) { delay_ = value; } std::queue> buffer_; int count; private: Handler* handler_; Queue* queue_; std::unordered_map>* promise_map_; int capacity_; int delay_; void handle_register_dequeue() { queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this))); } void handle_unregister_dequeue(std::promise promise) { queue_->UnregisterDequeue(); promise.set_value(); } }; // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty // Test 1 : Queue is empty // Enqueue end level : 1 // Dequeue end level : 0 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full TEST_F(QueueTest, register_enqueue_with_empty_queue) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); // Push kQueueSize data to enqueue_end buffer for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); // Register enqueue and expect data move to Queue std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); std::this_thread::sleep_for(std::chrono::milliseconds(20)); } // Enqueue end level : 1 // Dequeue end level : 0 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty TEST_F(QueueTest, register_dequeue_with_empty_queue) { Queue queue(kQueueSize); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); // Register dequeue, DequeueCallback shouldn't be invoked std::unordered_map> dequeue_promise_map; test_dequeue_end.RegisterDequeue(&dequeue_promise_map); std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_dequeue_end.count, 0); test_dequeue_end.UnregisterDequeue(); } // Test 2 : Queue is full // Enqueue end level : 0 // Dequeue end level : 1 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full TEST_F(QueueTest, register_enqueue_with_full_queue) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); // make Queue full for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // push some data to enqueue_end buffer and register enqueue; for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // EnqueueCallback shouldn't be invoked std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize); EXPECT_EQ(test_enqueue_end.count, kQueueSize); test_enqueue_end.UnregisterEnqueue(); } // Enqueue end level : 0 // Dequeue end level : 1 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty TEST_F(QueueTest, register_dequeue_with_full_queue) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); // make Queue full for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // Register dequeue and expect data move to dequeue end buffer std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kQueueSize); test_dequeue_end.UnregisterDequeue(); } // Test 3 : Queue is non-empty and non-full // Enqueue end level : 1 // Dequeue end level : 1 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked TEST_F(QueueTest, register_enqueue_with_half_empty_queue) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); // make Queue half empty for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // push some data to enqueue_end buffer and register enqueue; for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Register enqueue and expect data move to Queue enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); sync_enqueue_handler(); } // Enqueue end level : 1 // Dequeue end level : 1 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked TEST_F(QueueTest, register_dequeue_with_half_empty_queue) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); // make Queue half empty for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // Register dequeue and expect data move to dequeue end buffer std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace( std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); test_dequeue_end.UnregisterDequeue(); } // Dynamic level test // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked // Enqueue end level : 1 -> 0 // Dequeue end level : 1 // Test 4-1 Queue becomes full due to only register EnqueueCallback TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); // push double of kQueueSize to enqueue end buffer for (int i = 0; i < kDoubleOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Register enqueue and expect kQueueSize data move to Queue std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[kQueueSize].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), kQueueSize); // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); EXPECT_EQ(test_enqueue_end.count, kQueueSize); test_enqueue_end.UnregisterEnqueue(); } // Enqueue end level : 1 -> 0 // Dequeue end level : 1 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize); // push double of kQueueSize to enqueue end buffer for (int i = 0; i < kDoubleOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Register dequeue std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace( std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // Register enqueue std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace( std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Dequeue end will unregister when buffer size is kHalfOfQueueSize dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize); std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize); EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize); test_enqueue_end.UnregisterEnqueue(); } // Enqueue end level : 1 -> 0 // Dequeue end level : 1 // Test 4-3 Queue becomes full due to DequeueCallback is slower TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); // push double of kDoubleOfQueueSize to enqueue end buffer for (int i = 0; i < kDoubleOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Set 20 ms delay for callback and register dequeue std::unordered_map> dequeue_promise_map; test_dequeue_end.setDelay(20); auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // Register enqueue std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Wait for enqueue buffer empty and expect queue is full enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1)); test_dequeue_end.UnregisterDequeue(); } // Enqueue end level : 0 -> 1 // Dequeue end level : 1 -> 0 // Test 5 Queue becomes full and non empty at same time. TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) { Queue queue(kQueueSizeOne); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); // push double of kQueueSize to enqueue end buffer for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Register dequeue std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // Register enqueue std::unordered_map> enqueue_promise_map; auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Wait for all data move from enqueue end buffer to dequeue end buffer dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kQueueSize); test_dequeue_end.UnregisterDequeue(); } // Enqueue end level : 1 -> 0 // Dequeue end level : 1 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked TEST_F(QueueTest, queue_becomes_non_full_during_test) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3); // make Queue full for (int i = 0; i < kDoubleOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[kQueueSize].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), kQueueSize); // Expect kQueueSize data block in enqueue end buffer std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize); // Register dequeue std::unordered_map> dequeue_promise_map; test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // Expect enqueue end will empty enqueue_future = enqueue_promise_map[0].get_future(); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); test_dequeue_end.UnregisterDequeue(); } // Enqueue end level : 0 -> 1 // Dequeue end level : 1 -> 0 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5) TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) { Queue queue(kQueueSizeOne); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize); // push double of kQueueSize to enqueue end buffer for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } // Register dequeue std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // Register enqueue std::unordered_map> enqueue_promise_map; auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Wait for all data move from enqueue end buffer to dequeue end buffer dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kQueueSize); test_dequeue_end.UnregisterDequeue(); } // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked // Enqueue end level : 1 // Dequeue end level : 1 -> 0 // Test 8-1 Queue becomes empty due to only register DequeueCallback TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize); // make Queue half empty for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace( std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize); // Expect DequeueCallback should stop to be invoked std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize); } // Enqueue end level : 1 // Dequeue end level : 1 -> 0 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); // make Queue half empty for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple()); auto enqueue_future = enqueue_promise_map[0].get_future(); test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); enqueue_future.wait(); EXPECT_EQ(enqueue_future.get(), 0); // push kHalfOfQueueSize to enqueue end buffer and register enqueue. for (int i = 0; i < kHalfOfQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Register dequeue, expect kQueueSize move to dequeue end buffer std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kQueueSize); // Expect DequeueCallback should stop to be invoked std::this_thread::sleep_for(std::chrono::milliseconds(20)); EXPECT_EQ(test_dequeue_end.count, kQueueSize); } // Enqueue end level : 1 // Dequeue end level : 0 -> 1 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked TEST_F(QueueTest, queue_becomes_non_empty_during_test) { Queue queue(kQueueSize); TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_); TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize); // Register dequeue std::unordered_map> dequeue_promise_map; dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple()); test_dequeue_end.RegisterDequeue(&dequeue_promise_map); // push kQueueSize data to enqueue end buffer and register enqueue for (int i = 0; i < kQueueSize; i++) { std::unique_ptr data = std::make_unique(std::to_string(i)); test_enqueue_end.buffer_.push(std::move(data)); } std::unordered_map> enqueue_promise_map; test_enqueue_end.RegisterEnqueue(&enqueue_promise_map); // Expect kQueueSize data move to dequeue end buffer auto dequeue_future = dequeue_promise_map[kQueueSize].get_future(); dequeue_future.wait(); EXPECT_EQ(dequeue_future.get(), kQueueSize); } TEST_F(QueueTest, pass_smart_pointer_and_unregister) { Queue* queue = new Queue(kQueueSize); // Enqueue a string std::string valid = "Valid String"; std::shared_ptr shared = std::make_shared(valid); queue->RegisterEnqueue( enqueue_handler_, common::Bind( [](Queue* queue, std::shared_ptr shared) { queue->UnregisterEnqueue(); return std::make_unique(*shared); }, common::Unretained(queue), shared)); // Dequeue the string queue->RegisterDequeue( dequeue_handler_, common::Bind( [](Queue* queue, std::string valid) { queue->UnregisterDequeue(); auto answer = *queue->TryDequeue(); ASSERT_EQ(answer, valid); }, common::Unretained(queue), valid)); // Wait for both handlers to finish and delete the Queue std::promise promise; auto future = promise.get_future(); enqueue_handler_->Post(common::BindOnce( [](os::Handler* dequeue_handler, Queue* queue, std::promise* promise) { dequeue_handler->Post(common::BindOnce( [](Queue* queue, std::promise* promise) { delete queue; promise->set_value(); }, common::Unretained(queue), common::Unretained(promise))); }, common::Unretained(dequeue_handler_), common::Unretained(queue), common::Unretained(&promise))); future.wait(); } std::unique_ptr sleep_and_enqueue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; return std::make_unique("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait) { Queue queue(10); int* indicator = new int(100); queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterEnqueue(); EXPECT_EQ(*indicator, 101); delete indicator; } std::unique_ptr sleep_and_enqueue_callback_and_unregister( int* to_increase, Queue* queue, std::atomic_bool* is_registered) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; if (is_registered->exchange(false)) { queue->UnregisterEnqueue(); } return std::make_unique("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) { Queue queue(10); int* indicator = new int(100); std::atomic_bool is_registered = true; queue.RegisterEnqueue( enqueue_handler_, common::Bind( &sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator), common::Unretained(&queue), common::Unretained(&is_registered))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); if (is_registered.exchange(false)) { queue.UnregisterEnqueue(); } EXPECT_EQ(*indicator, 101); delete indicator; } void sleep_and_dequeue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; } TEST_F(QueueTest, unregister_dequeue_and_wait) { int* indicator = new int(100); Queue queue(10); queue.RegisterEnqueue( enqueue_handler_, common::Bind( [](Queue* queue) { queue->UnregisterEnqueue(); return std::make_unique("Hello"); }, common::Unretained(&queue))); queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterDequeue(); EXPECT_EQ(*indicator, 101); delete indicator; } // Create all threads for death tests in the function that dies class QueueDeathTest : public ::testing::Test { public: void RegisterEnqueueAndDelete() { Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL); Handler* enqueue_handler = new Handler(enqueue_thread); Queue* queue = new Queue(kQueueSizeOne); queue->RegisterEnqueue( enqueue_handler, common::Bind([]() { return std::make_unique("A string to fill the queue"); })); delete queue; } void RegisterDequeueAndDelete() { Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL); Handler* dequeue_handler = new Handler(dequeue_thread); Queue* queue = new Queue(kQueueSizeOne); queue->RegisterDequeue( dequeue_handler, common::Bind([](Queue* queue) { queue->TryDequeue(); }, common::Unretained(queue))); delete queue; } }; TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) { EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue"); } TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) { EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue"); } class MockIQueueEnqueue : public IQueueEnqueue { public: void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override { EXPECT_FALSE(registered_); registered_ = true; handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback)); } void handle_register_enqueue(EnqueueCallback callback) { if (dont_handle_register_enqueue_) { return; } while (registered_) { std::unique_ptr front = callback.Run(); queue_.push(*front); } } void UnregisterEnqueue() override { EXPECT_TRUE(registered_); registered_ = false; } bool dont_handle_register_enqueue_ = false; bool registered_ = false; std::queue queue_; }; class EnqueueBufferTest : public ::testing::Test { protected: void SetUp() override { thread_ = new Thread("test_thread", Thread::Priority::NORMAL); handler_ = new Handler(thread_); } void TearDown() override { handler_->Clear(); delete handler_; delete thread_; } void SynchronizeHandler() { std::promise promise; auto future = promise.get_future(); handler_->Post(common::BindOnce([](std::promise promise) { promise.set_value(); }, std::move(promise))); future.wait(); } MockIQueueEnqueue enqueue_; EnqueueBuffer enqueue_buffer_{&enqueue_}; Thread* thread_; Handler* handler_; }; TEST_F(EnqueueBufferTest, enqueue) { int num_items = 10; for (int i = 0; i < num_items; i++) { enqueue_buffer_.Enqueue(std::make_unique(i), handler_); } SynchronizeHandler(); for (int i = 0; i < num_items; i++) { ASSERT_EQ(enqueue_.queue_.front(), i); enqueue_.queue_.pop(); } ASSERT_FALSE(enqueue_.registered_); } TEST_F(EnqueueBufferTest, clear) { enqueue_.dont_handle_register_enqueue_ = true; int num_items = 10; for (int i = 0; i < num_items; i++) { enqueue_buffer_.Enqueue(std::make_unique(i), handler_); } ASSERT_TRUE(enqueue_.registered_); enqueue_buffer_.Clear(); ASSERT_FALSE(enqueue_.registered_); } TEST_F(EnqueueBufferTest, delete_when_in_callback) { Queue* queue = new Queue(kQueueSize); EnqueueBuffer* enqueue_buffer = new EnqueueBuffer(queue); int num_items = 10; for (int i = 0; i < num_items; i++) { enqueue_buffer->Enqueue(std::make_unique(i), handler_); } delete enqueue_buffer; delete queue; } } // namespace } // namespace os } // namespace bluetooth