Skip to content

Commit 1889e8e

Browse files
authored
merge develop
2 parents 66aaab3 + 62a72b4 commit 1889e8e

7 files changed

Lines changed: 427 additions & 1 deletion

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.24)
22

33
project(
44
dice-template-library
5-
VERSION 2.6.0
5+
VERSION 2.7.0
66
DESCRIPTION
77
"This template library is a collection of template-oriented code that we, the Data Science Group at UPB, found pretty handy. It contains: `switch_cases` (Use runtime values in compile-time context), `integral_template_tuple` (Create a tuple-like structure that instantiates a template for a range of values), `integral_template_variant` (A wrapper type for `std::variant` guarantees to only contain variants of the form `T<IX>` and `for_{types,values,range}` (Compile time for loops for types, values or ranges))."
88
HOMEPAGE_URL "https://dice-research.org/")

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ It contains:
1919
- `tuple_algorithms`: Some algorithms for iterating tuples
2020
- `fmt_join`: A helper to join elements of a range with a separator for use with `std::format` alike [fmt::join](https://fmt.dev/latest/api/#range-and-tuple-formatting)
2121
- `channel`: A single producer, single consumer queue
22+
- `exchange_channel`: Like `channel`, but retains only the most recently sent value (unread values are overwritten)
2223
- `variant2`: Like `std::variant` but optimized for exactly two types
2324
- `mutex`/`shared_mutex`: Rust inspired mutex interfaces that hold their data instead of living next to it
2425
- `static_string`: A string type that is smaller than `std::string` for use cases where you do not need to resize the string
@@ -149,6 +150,11 @@ statically known or a runtime variable depending on the `extent`/`max_extent` te
149150
A single-producer, single-consume queue. This can be used to communicate between threads in a more high level
150151
fashion than a mutex+container would allow.
151152

153+
### `exchange_channel`
154+
Like `channel`, but only the most recently sent value is retained: pushing overwrites any unread value, so a slow
155+
consumer skips intermediate updates and only ever sees the latest state. Read it with `pop()` (blocking) or
156+
`try_pop()` (non-blocking), or iterate it as a range until it is `close()`d.
157+
152158
### `variant2`
153159
Like `std::variant` but specifically optimized for usage with two types/variants.
154160
The internal representation is a `union` of the two types plus a 1 byte (3 state) discriminant.

examples/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ target_link_libraries(example_channel
7878
dice-template-library::dice-template-library
7979
)
8080

81+
add_executable(example_exchange_channel
82+
example_exchange_channel.cpp)
83+
target_link_libraries(example_exchange_channel
84+
PRIVATE
85+
dice-template-library::dice-template-library
86+
)
87+
8188
add_executable(example_variant2
8289
example_variant2.cpp)
8390
target_link_libraries(example_variant2
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include <dice/template-library/exchange_channel.hpp>
2+
3+
#include <cassert>
4+
#include <cstddef>
5+
#include <iostream>
6+
#include <thread>
7+
8+
9+
int main() {
10+
dice::template_library::exchange_channel<int> chan;
11+
12+
// only the most recently pushed value is kept
13+
chan.push(1);
14+
chan.push(2);
15+
chan.push(3);
16+
assert(chan.pop() == 3);
17+
assert(!chan.try_pop().has_value());
18+
19+
std::jthread producer{[&chan]() {
20+
for (int x = 0; x < 1000; ++x) {
21+
chan.push(x);
22+
}
23+
chan.close(); // don't forget to close
24+
}};
25+
26+
// consumer only sees the latest value at each read, never an older one
27+
int last = -1;
28+
std::size_t observed = 0;
29+
for (int x : chan) {
30+
assert(x >= last);
31+
last = x;
32+
++observed;
33+
}
34+
35+
std::cout << "consumer observed " << observed << " of 1000 pushed values, last = " << last << '\n';
36+
}
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
#ifndef DICE_TEMPLATELIBRARY_EXCHANGECHANNEL_HPP
2+
#define DICE_TEMPLATELIBRARY_EXCHANGECHANNEL_HPP
3+
4+
#include <atomic>
5+
#include <cassert>
6+
#include <condition_variable>
7+
#include <cstddef>
8+
#include <iterator>
9+
#include <mutex>
10+
#include <optional>
11+
#include <type_traits>
12+
#include <utility>
13+
14+
namespace dice::template_library {
15+
16+
/**
17+
* A multi producer, multi consumer channel
18+
* that only retains the last sent value.
19+
*/
20+
template<typename T>
21+
struct exchange_channel {
22+
using value_type = T;
23+
using size_type = size_t;
24+
using difference_type = std::ptrdiff_t;
25+
using reference = value_type &;
26+
using const_reference = value_type const &;
27+
using pointer = T *;
28+
using const_pointer = T const *;
29+
30+
private:
31+
std::mutex value_mutex_; ///< mutex for value_
32+
std::optional<T> value_; ///< the last value that was sent
33+
34+
std::atomic_flag closed_ = ATOMIC_FLAG_INIT; ///< true if this channel is closed
35+
std::condition_variable has_value_; ///< condvar for value_.has_value()
36+
37+
public:
38+
exchange_channel() = default;
39+
40+
// there is no way to safely implement these with concurrent access
41+
exchange_channel(exchange_channel const &other) = delete;
42+
exchange_channel(exchange_channel &&other) = delete;
43+
exchange_channel &operator=(exchange_channel const &other) = delete;
44+
exchange_channel &operator=(exchange_channel &&other) noexcept = delete;
45+
46+
~exchange_channel() noexcept = default;
47+
48+
/**
49+
* Close the channel.
50+
* After calling close calls to push() will return false
51+
* and calls to try_pop will return std::nullopt once the already present elements are exhausted
52+
*/
53+
void close() noexcept {
54+
{
55+
// "Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread."
56+
// - https://en.cppreference.com/w/cpp/thread/condition_variable
57+
//
58+
// Here closed_ is the shared variable used by has_value_ in another thread (the one waiting in pop)
59+
std::lock_guard lock{value_mutex_};
60+
closed_.test_and_set(std::memory_order_release);
61+
}
62+
has_value_.notify_all(); // notify pop() so that it does not get stuck
63+
}
64+
65+
/**
66+
* @return true if this channel is closed
67+
*/
68+
[[nodiscard]] bool closed() const noexcept {
69+
return closed_.test(std::memory_order_acquire);
70+
}
71+
72+
/**
73+
* Emplace an element into the channel, replaces the current element in the channel if there is one.
74+
*
75+
* @param args constructor args
76+
* @return true if emplacing the element succeeded because the channel is not yet closed
77+
*/
78+
template<typename... Args>
79+
bool emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v<value_type, decltype(std::forward<Args>(args))...>) {
80+
if (closed_.test(std::memory_order_acquire)) [[unlikely]] {
81+
return false;
82+
}
83+
84+
{
85+
std::unique_lock lock{value_mutex_};
86+
if (closed_.test(std::memory_order_relaxed)) [[unlikely]] {
87+
// relaxed is enough because we hold the lock
88+
return false;
89+
}
90+
91+
value_.emplace(std::forward<Args>(args)...);
92+
}
93+
94+
has_value_.notify_one();
95+
return true;
96+
}
97+
98+
/**
99+
* Push a single element into the channel, replaces the current element in the channel if there is one.
100+
*
101+
* @param value the element to push
102+
* @return true if pushing the element succeeded because the channel is not yet closed
103+
*/
104+
bool push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v<value_type>) {
105+
return emplace(value);
106+
}
107+
108+
/**
109+
* Push a single element into the channel, replaces the current element in the channel if there is one.
110+
*
111+
* @param value the element to push
112+
* @return true if pushing the element succeeded because the channel is not yet closed
113+
*/
114+
bool push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v<value_type>) {
115+
return emplace(std::move(value));
116+
}
117+
118+
/**
119+
* Try to get a (previously pushed) element from the channel.
120+
* If there is no element available, blocks until there is one available or the channel is closed.
121+
*
122+
* @return std::nullopt if the channel was closed, an element otherwise
123+
*/
124+
[[nodiscard]] std::optional<value_type> pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
125+
std::unique_lock lock{value_mutex_};
126+
has_value_.wait(lock, [this]() noexcept {
127+
return value_.has_value() || closed_.test(std::memory_order_relaxed);
128+
});
129+
130+
if (!value_.has_value()) [[unlikely]] {
131+
// implies closed_ == true
132+
return std::nullopt;
133+
}
134+
135+
return *std::exchange(value_, std::nullopt);
136+
}
137+
138+
/**
139+
* Try to get a (previously pushed) element from the channel.
140+
* Unlike pop(), if there is no element available, returns std::nullopt immediatly.
141+
*/
142+
[[nodiscard]] std::optional<value_type> try_pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
143+
std::unique_lock lock{value_mutex_};
144+
if (!value_.has_value()) {
145+
return std::nullopt;
146+
}
147+
148+
return *std::exchange(value_, std::nullopt);
149+
}
150+
151+
struct iterator {
152+
using channel_type = exchange_channel;
153+
using value_type = T;
154+
using difference_type = std::ptrdiff_t;
155+
using reference = T &;
156+
using const_reference = T const &;
157+
using pointer = typename exchange_channel::pointer;
158+
using const_pointer = typename exchange_channel::const_pointer;
159+
using iterator_category = std::input_iterator_tag;
160+
161+
private:
162+
exchange_channel *chan_;
163+
mutable std::optional<value_type> buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator
164+
165+
void advance() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
166+
buf_ = chan_->pop();
167+
}
168+
169+
public:
170+
explicit iterator(channel_type *chan) noexcept(std::is_nothrow_move_constructible_v<value_type>)
171+
: chan_{chan} {
172+
advance();
173+
}
174+
175+
iterator(iterator const &other) noexcept(std::is_nothrow_copy_constructible_v<value_type>)
176+
: chan_{other.chan_},
177+
buf_{other.buf_} {
178+
}
179+
180+
iterator &operator=(iterator const &other) noexcept(std::is_nothrow_copy_assignable_v<value_type>) {
181+
if (this == &other) {
182+
return *this;
183+
}
184+
185+
chan_ = other.chan_;
186+
buf_ = other.buf_;
187+
return *this;
188+
}
189+
190+
iterator(iterator &&other) noexcept(std::is_nothrow_move_constructible_v<value_type>)
191+
: chan_{other.chan_},
192+
buf_{std::move(other.buf_)} {
193+
}
194+
195+
iterator &operator=(iterator &&other) noexcept(std::is_nothrow_swappable_v<value_type>) {
196+
assert(this != &other);
197+
std::swap(chan_, other.chan_);
198+
std::swap(buf_, other.buf_);
199+
return *this;
200+
}
201+
202+
/// intentionally returns a reference instead of a const_reference to be able to extract it
203+
reference operator*() noexcept {
204+
return *buf_;
205+
}
206+
207+
reference operator*() const noexcept {
208+
return *buf_;
209+
}
210+
211+
/// intentionally returns a pointer instead of a const_pointer to be able to extract it
212+
pointer operator->() noexcept {
213+
return &*buf_;
214+
}
215+
216+
pointer operator->() const noexcept {
217+
return &*buf_;
218+
}
219+
220+
iterator &operator++() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
221+
advance();
222+
return *this;
223+
}
224+
225+
void operator++(int) noexcept(std::is_nothrow_move_constructible_v<value_type>) {
226+
advance();
227+
}
228+
229+
bool operator==(std::default_sentinel_t) const noexcept {
230+
return !buf_.has_value();
231+
}
232+
};
233+
234+
using sentinel = std::default_sentinel_t;
235+
236+
/**
237+
* @return an iterator over all present and future elements of this channel
238+
* @note iterator == end() is true once the channel is closed
239+
*/
240+
[[nodiscard]] iterator begin() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
241+
return iterator{this};
242+
}
243+
244+
[[nodiscard]] sentinel end() const noexcept {
245+
return std::default_sentinel;
246+
}
247+
};
248+
249+
} // namespace dice::template_library
250+
251+
#endif // DICE_TEMPLATELIBRARY_EXCHANGECHANNEL_HPP

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,6 @@ custom_add_test(tests_dbg)
120120

121121
add_executable(tests_opt_minmax tests_opt_minmax.cpp)
122122
custom_add_test(tests_opt_minmax)
123+
124+
add_executable(tests_exchange_channel tests_exchange_channel.cpp)
125+
custom_add_test(tests_exchange_channel)

0 commit comments

Comments
 (0)