Skip to content

Commit bb301af

Browse files
committed
implement watch
1 parent 3d85a0e commit bb301af

3 files changed

Lines changed: 270 additions & 0 deletions

File tree

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
#ifndef DICE_TEMPLATELIBRARY_WATCH_HPP
2+
#define DICE_TEMPLATELIBRARY_WATCH_HPP
3+
4+
#include <atomic>
5+
#include <cassert>
6+
#include <condition_variable>
7+
#include <cstddef>
8+
#include <mutex>
9+
#include <optional>
10+
11+
namespace dice::template_library {
12+
13+
/**
14+
* A multi producer, multi consumer channel
15+
* that only retains the last sent value.
16+
*/
17+
template<typename T>
18+
struct watch {
19+
using value_type = T;
20+
using size_type = size_t;
21+
using difference_type = std::ptrdiff_t;
22+
using reference = value_type &;
23+
using const_reference = value_type const &;
24+
using pointer = T *;
25+
using const_pointer = T const *;
26+
27+
private:
28+
std::mutex value_mutex_; ///< mutex for value_
29+
std::optional<T> value_; ///< the last value that was sent
30+
31+
std::atomic_flag closed_ = ATOMIC_FLAG_INIT; ///< true if this channel is closed
32+
std::condition_variable has_value_; ///< condvar for value_.has_value()
33+
34+
public:
35+
watch() = default;
36+
37+
// there is no way to safely implement these with concurrent access
38+
watch(watch const &other) = delete;
39+
watch(watch &&other) = delete;
40+
watch &operator=(watch const &other) = delete;
41+
watch &operator=(watch &&other) noexcept = delete;
42+
43+
~watch() noexcept = default;
44+
45+
/**
46+
* Close the channel.
47+
* After calling close calls to push() will return false
48+
* and calls to try_pop will return std::nullopt once the already present elements are exhausted
49+
*/
50+
void close() noexcept {
51+
{
52+
// "Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread."
53+
// - https://en.cppreference.com/w/cpp/thread/condition_variable
54+
//
55+
// Here closed_ is the shared variable used by queue_not_empty_ in another thread (the one waiting in try_pop)
56+
std::lock_guard lock{value_mutex_};
57+
closed_.test_and_set(std::memory_order_release);
58+
}
59+
has_value_.notify_all(); // notify pop() so that it does not get stuck
60+
}
61+
62+
/**
63+
* @return true if this channel is closed
64+
*/
65+
[[nodiscard]] bool closed() const noexcept {
66+
return closed_.test(std::memory_order_acquire);
67+
}
68+
69+
/**
70+
* Emplace an element into the channel, replaces the current element in the channel if there is one.
71+
*
72+
* @param args constructor args
73+
* @return true if emplacing the element succeeded because the channel is not yet closed
74+
*/
75+
template<typename ...Args>
76+
bool emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v<value_type, decltype(std::forward<Args>(args))...>) {
77+
if (closed_.test(std::memory_order_acquire)) [[unlikely]] {
78+
return false;
79+
}
80+
81+
{
82+
std::unique_lock lock{value_mutex_};
83+
if (closed_.test(std::memory_order_relaxed)) [[unlikely]] {
84+
// relaxed is enough because we hold the lock
85+
// wait was exited because closed_ was true (channel closed)
86+
return false;
87+
}
88+
89+
value_.emplace(std::forward<Args>(args)...);
90+
}
91+
92+
has_value_.notify_one();
93+
return true;
94+
}
95+
96+
/**
97+
* Push a single element into the channel, replaces the current element in the channel if there is one.
98+
*
99+
* @param value the element to push
100+
* @return true if pushing the element succeeded because the channel is not yet closed
101+
*/
102+
bool push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v<value_type>) {
103+
return emplace(value);
104+
}
105+
106+
/**
107+
* Push a single element into the channel, replaces the current element in the channel if there is one.
108+
*
109+
* @param value the element to push
110+
* @return true if pushing the element succeeded because the channel is not yet closed
111+
*/
112+
bool push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v<value_type>) {
113+
return emplace(std::move(value));
114+
}
115+
116+
/**
117+
* Try to get a (previously pushed) element from the channel.
118+
* If there is no element available, blocks until there is one available or the channel is closed.
119+
*
120+
* @return std::nullopt if the channel was closed, an element otherwise
121+
*/
122+
[[nodiscard]] std::optional<value_type> pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
123+
std::unique_lock lock{value_mutex_};
124+
has_value_.wait(lock, [this]() noexcept { return value_.has_value() || closed_.test(std::memory_order_relaxed); });
125+
126+
if (!value_.has_value()) [[unlikely]] {
127+
// implies closed_ == true
128+
return std::nullopt;
129+
}
130+
131+
return *std::exchange(value_, std::nullopt);
132+
}
133+
134+
/**
135+
* Try to get a (previously pushed) element from the channel.
136+
* Unlike pop(), if there is no element available, returns std::nullopt immediatly.
137+
*/
138+
[[nodiscard]] std::optional<value_type> try_pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
139+
std::unique_lock lock{value_mutex_};
140+
if (!value_.has_value()) {
141+
return std::nullopt;
142+
}
143+
144+
return *std::exchange(value_, std::nullopt);
145+
}
146+
147+
struct iterator {
148+
using watch_type = watch;
149+
using value_type = T;
150+
using difference_type = std::ptrdiff_t;
151+
using reference = T &;
152+
using const_reference = T const &;
153+
using pointer = typename watch_type::pointer;
154+
using const_pointer = typename watch_type::const_pointer;
155+
using iterator_category = std::input_iterator_tag;
156+
157+
private:
158+
watch_type *watch_;
159+
mutable std::optional<value_type> buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator
160+
161+
void advance() noexcept {
162+
buf_ = watch_->pop();
163+
}
164+
165+
public:
166+
explicit iterator(watch_type *watch) noexcept : watch_{watch} {
167+
advance();
168+
}
169+
170+
iterator(iterator const &other) noexcept(std::is_nothrow_copy_constructible_v<value_type>)
171+
: watch_{other.watch_},
172+
buf_{other.buf_} {
173+
}
174+
175+
iterator &operator=(iterator const &other) noexcept(std::is_nothrow_copy_assignable_v<value_type>) {
176+
if (this == &other) {
177+
return *this;
178+
}
179+
180+
watch_ = other.watch_;
181+
buf_ = other.buf_;
182+
return *this;
183+
}
184+
185+
iterator(iterator &&other) noexcept(std::is_nothrow_move_constructible_v<value_type>)
186+
: watch_{other.watch_},
187+
buf_{std::move(other.buf_)} {
188+
}
189+
190+
iterator &operator=(iterator &&other) noexcept(std::is_nothrow_swappable_v<value_type>) {
191+
assert(this != &other);
192+
std::swap(watch_, other.watch_);
193+
std::swap(buf_, other.buf_);
194+
return *this;
195+
}
196+
197+
reference operator*() noexcept {
198+
return *buf_;
199+
}
200+
201+
reference operator*() const noexcept {
202+
return *buf_;
203+
}
204+
205+
pointer operator->() noexcept {
206+
return &*buf_;
207+
}
208+
209+
pointer operator->() const noexcept {
210+
return &*buf_;
211+
}
212+
213+
iterator &operator++() noexcept {
214+
advance();
215+
return *this;
216+
}
217+
218+
void operator++(int) noexcept {
219+
advance();
220+
}
221+
222+
bool operator==(std::default_sentinel_t) const noexcept {
223+
return !buf_.has_value();
224+
}
225+
};
226+
227+
using sentinel = std::default_sentinel_t;
228+
229+
/**
230+
* @return an iterator over all present and future elements of this channel
231+
* @note iterator == end() is true once the channel is closed
232+
*/
233+
[[nodiscard]] iterator begin() noexcept {
234+
return iterator{this};
235+
}
236+
237+
[[nodiscard]] sentinel end() const noexcept {
238+
return std::default_sentinel;
239+
}
240+
};
241+
242+
} // namespace dice::template_library
243+
244+
#endif // DICE_TEMPLATELIBRARY_WATCH_HPP

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,6 @@ custom_add_test(tests_dbg)
120120

121121
add_executable(tests_opt_minmax tests_opt_minmax.cpp)
122122
custom_add_test(tests_opt_minmax)
123+
124+
add_executable(tests_watch tests_watch.cpp)
125+
custom_add_test(tests_watch)

tests/tests_watch.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
2+
#include <doctest/doctest.h>
3+
4+
#include <dice/template-library/watch.hpp>
5+
6+
TEST_SUITE("watch") {
7+
TEST_CASE("sanity check") {
8+
dice::template_library::watch<int> w;
9+
10+
w.push(1);
11+
CHECK_EQ(w.pop(), 1);
12+
CHECK_FALSE(w.try_pop().has_value());
13+
14+
w.push(2);
15+
w.push(3);
16+
CHECK_EQ(w.pop(), 3);
17+
CHECK_FALSE(w.try_pop().has_value());
18+
19+
w.close();
20+
CHECK_FALSE(w.pop().has_value());
21+
CHECK_FALSE(w.try_pop().has_value());
22+
}
23+
}

0 commit comments

Comments
 (0)