TeiaCareSDK  v0.1.0
TeiaCareSDK is a collection of reusable C++ components
Loading...
Searching...
No Matches
blocking_queue.hpp
1// Copyright 2024 TeiaCare
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#pragma once
16
17#include <teiacare/sdk/non_copyable.hpp>
18#include <teiacare/sdk/non_moveable.hpp>
19
20#include <algorithm>
21#include <condition_variable>
22#include <mutex>
23#include <optional>
24#include <queue>
25
26namespace tc::sdk
27{
28/*!
29 * \class blocking_queue
30 * \brief Thread safe, blocking queue
31 * \tparam T Queue items type
32 *
33 * The queue has a fixed capacity (i.e. maximum number of items that can be hold).
34 * When the queue is full and a new item is needs to be inserted via blocking_queue::push() the queue blocks until an item is popped.
35 * Viceversa, when the queue is empty and an item is required via blocking_queue::pop(), the queue blocks until the first item is pushed.
36 */
37template <typename T>
39{
40public:
41 /*!
42 * \brief Constructor
43 * \param capacity set the maximum number of items the queue can hold
44 *
45 * Creates a tc::sdk::blocking_queue instance.
46 */
47 explicit blocking_queue(size_t capacity)
48 : _capacity(std::clamp(capacity, size_t{1}, std::numeric_limits<size_t>::max()))
49 {
50 }
51
52 /*!
53 * \brief Destructor
54 *
55 * Destructs this.
56 */
57 ~blocking_queue() = default;
58
59 /*!
60 * \brief Insert an item into the queue
61 * \tparam item The item to be inserted
62 *
63 * If this method is called when the queue is full the calling thread is blocked until an item is popped from the queue.
64 */
65 void push(const T& item)
66 {
67 std::unique_lock lock(_mutex);
68 if (is_full())
69 _last_item_popped.wait(lock, [this] { return !is_full(); });
70
71 _queue.push(item);
72 push_impl(std::move(lock));
73 }
74
75 /*!
76 * \brief Insert an item into the queue
77 * \tparam item The item to be inserted
78 *
79 * If this method is called when the queue is full the calling thread is blocked until an item is popped from the queue.
80 * This is an overload of blocking_queue::push(const T&) which emplaces the item instead of using a const ref.
81 */
82 void push(T&& item)
83 {
84 std::unique_lock lock(_mutex);
85 if (is_full())
86 _last_item_popped.wait(lock, [this] { return !is_full(); });
87
88 _queue.emplace(item);
89 push_impl(std::move(lock));
90 }
91
92 /*!
93 * \brief Try to insert an item into the queue
94 * \tparam item The item to be inserted
95 * \return true if the item was inserted
96 *
97 * If this method is called when the queue is full the calling thread is not blocked and false is returned.
98 * Otherwise the thread is locked until the item is inserted, then true is returned.
99 */
100 bool try_push(const T& item)
101 {
102 std::unique_lock lock(_mutex);
103 if (is_full())
104 return false;
105
106 _queue.push(item);
107 push_impl(std::move(lock));
108
109 return true;
110 }
111
112 /*!
113 * \brief Try to insert an item into the queue
114 * \tparam item The item to be inserted
115 * \return true if the item was inserted
116 *
117 * If this method is called when the queue is full the calling thread is not blocked and false is returned.
118 * Otherwise the thread is locked until the item is inserted, then true is returned.
119 * This is an overload of blocking_queue::try_push(const T&) which emplaces the item instead of using a const ref.
120 */
121 bool try_push(T&& item)
122 {
123 std::unique_lock lock(_mutex);
124 if (is_full())
125 return false;
126
127 _queue.emplace(item);
128 push_impl(std::move(lock));
129
130 return true;
131 }
132
133 /*!
134 * \brief Retrieve an item from the queue
135 * \return T value
136 *
137 * If this method is called when the queue is full the calling thread is blocked until an item is pushed in the queue.
138 */
139 T pop()
140 {
141 std::unique_lock lock(_mutex);
142 if (is_empty())
143 _first_item_pushed.wait(lock, [this] { return !is_empty(); });
144
145 T item = _queue.front();
146 _queue.pop();
147 pop_impl(std::move(lock));
148
149 return item;
150 }
151
152 /*!
153 * \brief Try to retrieve an item from the queue
154 * \return std::optional<T> value
155 *
156 * If this method is called when the queue is empty the calling thread is not blocked and std::nullopt is returned.
157 * Otherwise the thread is locked until the item is removed, then std::optional<T> is returned.
158 */
159 std::optional<T> try_pop()
160 {
161 std::unique_lock lock(_mutex);
162 if (is_empty())
163 return std::nullopt;
164
165 T item = _queue.front();
166 _queue.pop();
167 pop_impl(std::move(lock));
168
169 return std::move(std::optional(item));
170 }
171
172 /*!
173 * \brief Get the number of items currently in the queue
174 * \return queue size
175 */
176 [[nodiscard]] size_t size() const
177 {
178 std::lock_guard lock(_mutex);
179 return _queue.size();
180 }
181
182 /*!
183 * \brief Get the maximum number of items that the queue can hold
184 * \return queue capacity
185 */
186 [[nodiscard]] size_t capacity() const
187 {
188 std::lock_guard lock(_mutex);
189 return _capacity;
190 }
191
192private:
193 std::queue<T> _queue;
194 mutable std::mutex _mutex;
195 std::condition_variable _last_item_popped;
196 std::condition_variable _first_item_pushed;
197 const size_t _capacity;
198
199 inline void push_impl(std::unique_lock<std::mutex>&& lock)
200 {
201 const bool is_first_item_pushed = _queue.size() == 1;
202 lock.unlock();
203
205 _first_item_pushed.notify_all();
206 }
207
208 inline void pop_impl(std::unique_lock<std::mutex>&& lock)
209 {
210 const bool is_last_item_popped = _queue.size() == _capacity - 1;
211 lock.unlock();
212
213 if (is_last_item_popped)
214 _last_item_popped.notify_all();
215 }
216
217 inline bool is_empty() const
218 {
219 return _queue.empty();
220 }
221 inline bool is_full() const
222 {
223 return _queue.size() >= _capacity;
224 }
225};
226
227}
Thread safe, blocking queue.
std::optional< T > try_pop()
Try to retrieve an item from the queue.
bool try_push(T &&item)
Try to insert an item into the queue.
blocking_queue(size_t capacity)
Constructor.
size_t capacity() const
Get the maximum number of items that the queue can hold.
void push(T &&item)
Insert an item into the queue.
T pop()
Retrieve an item from the queue.
bool try_push(const T &item)
Try to insert an item into the queue.
void push(const T &item)
Insert an item into the queue.
~blocking_queue()=default
Destructor.
size_t size() const
Get the number of items currently in the queue.
Utility class expected to be used as a base class to define non-copyable classes.
Utility class expected to be used as a base class to define non-moveable classes.