1 #pragma once
2 
3 /*
4  * Copyright (C) 2016 The Android Open Source Project
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <iterator>
22 #include <mutex>
23 #include <type_traits>
24 #include <utility>
25 
26 namespace cuttlefish {
27 // Simple queue with Push and Pop capabilities.
28 // If the max_elements argument is passed to the constructor, and Push is called
29 // when the queue holds max_elements items, the max_elements_handler is called
30 // with a pointer to the internal QueueImpl. The call is made while holding
31 // the guarding mutex; operations on the QueueImpl will not interleave with
32 // other threads calling Push() or Pop().
33 // The QueueImpl type will be a SequenceContainer.
34 template <typename T>
35 class ThreadSafeQueue {
36  public:
37   using QueueImpl = std::deque<T>;
38   using QueueFullHandler = std::function<void(QueueImpl*)>;
39 
40   ThreadSafeQueue() = default;
ThreadSafeQueue(std::size_t max_elements,QueueFullHandler max_elements_handler)41   explicit ThreadSafeQueue(std::size_t max_elements,
42                            QueueFullHandler max_elements_handler)
43       : max_elements_{max_elements},
44         max_elements_handler_{std::move(max_elements_handler)} {}
45 
Pop()46   T Pop() {
47     std::unique_lock<std::mutex> guard(m_);
48     while (items_.empty()) {
49       new_item_.wait(guard);
50     }
51     auto t = std::move(items_.front());
52     items_.pop_front();
53     return t;
54   }
55 
PopAll()56   QueueImpl PopAll() {
57     std::unique_lock<std::mutex> guard(m_);
58     while (items_.empty()) {
59       new_item_.wait(guard);
60     }
61     return std::move(items_);
62   }
63 
64   template <typename U>
Push(U && u)65   bool Push(U&& u) {
66     static_assert(std::is_assignable_v<T, decltype(u)>);
67     std::lock_guard<std::mutex> guard(m_);
68     const bool has_room = DropItemsIfAtCapacity();
69     if (!has_room) {
70       return false;
71     }
72     items_.push_back(std::forward<U>(u));
73     new_item_.notify_one();
74     return true;
75   }
76 
IsEmpty()77   bool IsEmpty() {
78     std::lock_guard<std::mutex> guard(m_);
79     return items_.empty();
80   }
81 
IsFull()82   bool IsFull() {
83     std::lock_guard<std::mutex> guard(m_);
84     return items_.size() == max_elements_;
85   }
86 
87  private:
88   // return whether there's room to push
DropItemsIfAtCapacity()89   bool DropItemsIfAtCapacity() {
90     if (max_elements_ && max_elements_ == items_.size()) {
91       max_elements_handler_(&items_);
92     }
93     if (max_elements_ && max_elements_ == items_.size()) {
94       // handler intends to ignore the newly coming element or
95       // did not empty the room for whatever reason
96       return false;
97     }
98     return true;
99   }
100 
101   std::mutex m_;
102   std::size_t max_elements_{};
103   QueueFullHandler max_elements_handler_{};
104   std::condition_variable new_item_;
105   QueueImpl items_;
106 };
107 }  // namespace cuttlefish
108