1 #pragma once 2 3 /* 4 * Copyright (C) 2016 The Android Open Source Project 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 #include <condition_variable> 20 #include <deque> 21 #include <iterator> 22 #include <mutex> 23 #include <type_traits> 24 #include <utility> 25 26 namespace cuttlefish { 27 // Simple queue with Push and Pop capabilities. 28 // If the max_elements argument is passed to the constructor, and Push is called 29 // when the queue holds max_elements items, the max_elements_handler is called 30 // with a pointer to the internal QueueImpl. The call is made while holding 31 // the guarding mutex; operations on the QueueImpl will not interleave with 32 // other threads calling Push() or Pop(). 33 // The QueueImpl type will be a SequenceContainer. 34 template <typename T> 35 class ThreadSafeQueue { 36 public: 37 using QueueImpl = std::deque<T>; 38 using QueueFullHandler = std::function<void(QueueImpl*)>; 39 40 ThreadSafeQueue() = default; ThreadSafeQueue(std::size_t max_elements,QueueFullHandler max_elements_handler)41 explicit ThreadSafeQueue(std::size_t max_elements, 42 QueueFullHandler max_elements_handler) 43 : max_elements_{max_elements}, 44 max_elements_handler_{std::move(max_elements_handler)} {} 45 Pop()46 T Pop() { 47 std::unique_lock<std::mutex> guard(m_); 48 while (items_.empty()) { 49 new_item_.wait(guard); 50 } 51 auto t = std::move(items_.front()); 52 items_.pop_front(); 53 return t; 54 } 55 PopAll()56 QueueImpl PopAll() { 57 std::unique_lock<std::mutex> guard(m_); 58 while (items_.empty()) { 59 new_item_.wait(guard); 60 } 61 return std::move(items_); 62 } 63 64 template <typename U> Push(U && u)65 bool Push(U&& u) { 66 static_assert(std::is_assignable_v<T, decltype(u)>); 67 std::lock_guard<std::mutex> guard(m_); 68 const bool has_room = DropItemsIfAtCapacity(); 69 if (!has_room) { 70 return false; 71 } 72 items_.push_back(std::forward<U>(u)); 73 new_item_.notify_one(); 74 return true; 75 } 76 IsEmpty()77 bool IsEmpty() { 78 std::lock_guard<std::mutex> guard(m_); 79 return items_.empty(); 80 } 81 IsFull()82 bool IsFull() { 83 std::lock_guard<std::mutex> guard(m_); 84 return items_.size() == max_elements_; 85 } 86 87 private: 88 // return whether there's room to push DropItemsIfAtCapacity()89 bool DropItemsIfAtCapacity() { 90 if (max_elements_ && max_elements_ == items_.size()) { 91 max_elements_handler_(&items_); 92 } 93 if (max_elements_ && max_elements_ == items_.size()) { 94 // handler intends to ignore the newly coming element or 95 // did not empty the room for whatever reason 96 return false; 97 } 98 return true; 99 } 100 101 std::mutex m_; 102 std::size_t max_elements_{}; 103 QueueFullHandler max_elements_handler_{}; 104 std::condition_variable new_item_; 105 QueueImpl items_; 106 }; 107 } // namespace cuttlefish 108