Skip to content

Commit a9787b1

Browse files
godexsoftkuznetsss
andauthored
feat: Basic support for channels (#2859)
This PR implements go-like channels wrapper (on top of asio experimental channels). In the future this will be integrated into the AsyncFramework. --------- Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
1 parent 9f76eab commit a9787b1

File tree

3 files changed

+1141
-0
lines changed

3 files changed

+1141
-0
lines changed

src/util/Channel.hpp

Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#pragma once
21+
22+
#include <boost/asio/any_io_executor.hpp>
23+
#include <boost/asio/experimental/channel.hpp>
24+
#include <boost/asio/experimental/concurrent_channel.hpp>
25+
#include <boost/asio/spawn.hpp>
26+
#include <boost/system/detail/error_code.hpp>
27+
28+
#include <concepts>
29+
#include <cstddef>
30+
#include <memory>
31+
#include <optional>
32+
#include <type_traits>
33+
#include <utility>
34+
35+
namespace util {
36+
37+
#ifdef __clang__
38+
namespace detail {
39+
// Forward declaration for compile-time check
40+
template <typename T>
41+
struct ChannelInstantiated;
42+
} // namespace detail
43+
#endif
44+
45+
/**
46+
* @brief Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pipe.
47+
* @note Use INSTANTIATE_CHANNEL_FOR_CLANG macro when using this class. See docs at the bottom of the file for more
48+
* details.
49+
*
50+
* @tparam T The type of data the channel transfers
51+
*/
52+
template <typename T>
53+
class Channel {
54+
private:
55+
class ControlBlock {
56+
using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
57+
boost::asio::any_io_executor executor_;
58+
InternalChannelType ch_;
59+
60+
public:
61+
ControlBlock(auto&& context, std::size_t capacity) : executor_(context.get_executor()), ch_(context, capacity)
62+
{
63+
}
64+
65+
[[nodiscard]] InternalChannelType&
66+
channel()
67+
{
68+
return ch_;
69+
}
70+
71+
void
72+
close()
73+
{
74+
if (not isClosed()) {
75+
ch_.close();
76+
// Workaround for Boost bug: close() alone doesn't cancel pending async operations.
77+
// We must call cancel() to unblock them. The bug also causes cancel() to return
78+
// error_code 0 instead of channel_cancelled, so async operations must check
79+
// isClosed() to detect this case.
80+
// https://github.com/chriskohlhoff/asio/issues/1575
81+
ch_.cancel();
82+
}
83+
}
84+
85+
[[nodiscard]] bool
86+
isClosed() const
87+
{
88+
return not ch_.is_open();
89+
}
90+
};
91+
92+
/**
93+
* @brief This is used to close the channel once either all Senders or all Receivers are destroyed
94+
*/
95+
struct Guard {
96+
std::shared_ptr<ControlBlock> shared;
97+
98+
~Guard()
99+
{
100+
shared->close();
101+
}
102+
};
103+
104+
/**
105+
* @brief The sending end of a channel.
106+
*
107+
* Sender is copyable and movable. The channel remains open as long as at least one Sender exists.
108+
* When all Sender instances are destroyed, the channel is closed and receivers will receive std::nullopt.
109+
*/
110+
class Sender {
111+
std::shared_ptr<ControlBlock> shared_;
112+
std::shared_ptr<Guard> guard_;
113+
114+
public:
115+
/**
116+
* @brief Constructs a Sender from a shared control block.
117+
* @param shared The shared control block managing the channel state
118+
*/
119+
Sender(std::shared_ptr<ControlBlock> shared)
120+
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
121+
122+
Sender(Sender&&) = default;
123+
Sender(Sender const&) = default;
124+
Sender&
125+
operator=(Sender&&) = default;
126+
Sender&
127+
operator=(Sender const&) = default;
128+
129+
/**
130+
* @brief Asynchronously sends data through the channel using a coroutine.
131+
*
132+
* Blocks the coroutine until the data is sent or the channel is closed.
133+
*
134+
* @tparam D The type of data to send (must be convertible to T)
135+
* @param data The data to send
136+
* @param yield The Boost.Asio yield context for coroutine suspension
137+
* @return true if the data was sent successfully, false if the channel is closed
138+
*/
139+
template <typename D>
140+
bool
141+
asyncSend(D&& data, boost::asio::yield_context yield)
142+
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
143+
{
144+
boost::system::error_code ecIn, ecOut;
145+
shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
146+
147+
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
148+
if (not ecOut and shared_->isClosed())
149+
return false;
150+
151+
return not ecOut;
152+
}
153+
154+
/**
155+
* @brief Asynchronously sends data through the channel using a callback.
156+
*
157+
* The callback is invoked when the send operation completes.
158+
*
159+
* @tparam D The type of data to send (must be convertible to T)
160+
* @param data The data to send
161+
* @param fn Callback function invoked with true if successful, false if the channel is closed
162+
*/
163+
template <typename D>
164+
void
165+
asyncSend(D&& data, std::invocable<bool> auto&& fn)
166+
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
167+
{
168+
boost::system::error_code ecIn;
169+
shared_->channel().async_send(
170+
ecIn,
171+
std::forward<D>(data),
172+
[fn = std::forward<decltype(fn)>(fn), shared = shared_](boost::system::error_code ec) mutable {
173+
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
174+
if (not ec and shared->isClosed()) {
175+
fn(false);
176+
return;
177+
}
178+
179+
fn(not ec);
180+
}
181+
);
182+
}
183+
184+
/**
185+
* @brief Attempts to send data through the channel without blocking.
186+
*
187+
* @tparam D The type of data to send (must be convertible to T)
188+
* @param data The data to send
189+
* @return true if the data was sent successfully, false if the channel is full or closed
190+
*/
191+
template <typename D>
192+
bool
193+
trySend(D&& data)
194+
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
195+
{
196+
boost::system::error_code ec;
197+
return shared_->channel().try_send(ec, std::forward<D>(data));
198+
}
199+
};
200+
201+
/**
202+
* @brief The receiving end of a channel.
203+
*
204+
* Receiver is copyable and movable. Multiple receivers can consume from the same channel concurrently.
205+
* When all Receiver instances are destroyed, the channel is closed and senders will fail to send.
206+
*/
207+
class Receiver {
208+
std::shared_ptr<ControlBlock> shared_;
209+
std::shared_ptr<Guard> guard_;
210+
211+
public:
212+
/**
213+
* @brief Constructs a Receiver from a shared control block.
214+
* @param shared The shared control block managing the channel state
215+
*/
216+
Receiver(std::shared_ptr<ControlBlock> shared)
217+
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
218+
219+
Receiver(Receiver&&) = default;
220+
Receiver(Receiver const&) = default;
221+
Receiver&
222+
operator=(Receiver&&) = default;
223+
Receiver&
224+
operator=(Receiver const&) = default;
225+
226+
/**
227+
* @brief Attempts to receive data from the channel without blocking.
228+
*
229+
* @return std::optional containing the received value, or std::nullopt if the channel is empty or closed
230+
*/
231+
std::optional<T>
232+
tryReceive()
233+
{
234+
std::optional<T> result;
235+
shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
236+
if (not ec)
237+
result = std::forward<decltype(value)>(value);
238+
});
239+
240+
return result;
241+
}
242+
243+
/**
244+
* @brief Asynchronously receives data from the channel using a coroutine.
245+
*
246+
* Blocks the coroutine until data is available or the channel is closed.
247+
*
248+
* @param yield The Boost.Asio yield context for coroutine suspension
249+
* @return std::optional containing the received value, or std::nullopt if the channel is closed
250+
*/
251+
[[nodiscard]] std::optional<T>
252+
asyncReceive(boost::asio::yield_context yield)
253+
{
254+
boost::system::error_code ec;
255+
auto value = shared_->channel().async_receive(yield[ec]);
256+
257+
if (ec)
258+
return std::nullopt;
259+
260+
return value;
261+
}
262+
263+
/**
264+
* @brief Asynchronously receives data from the channel using a callback.
265+
*
266+
* The callback is invoked when data is available or the channel is closed.
267+
*
268+
* @param fn Callback function invoked with std::optional containing the value, or std::nullopt if closed
269+
*/
270+
void
271+
asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
272+
{
273+
shared_->channel().async_receive(
274+
[fn = std::forward<decltype(fn)>(fn)](boost::system::error_code ec, T&& value) mutable {
275+
if (ec) {
276+
fn(std::optional<T>(std::nullopt));
277+
return;
278+
}
279+
280+
fn(std::make_optional<T>(std::move(value)));
281+
}
282+
);
283+
}
284+
285+
/**
286+
* @brief Checks if the channel is closed.
287+
*
288+
* A channel is closed when all Sender instances have been destroyed.
289+
*
290+
* @return true if the channel is closed, false otherwise
291+
*/
292+
[[nodiscard]] bool
293+
isClosed() const
294+
{
295+
return shared_->isClosed();
296+
}
297+
};
298+
299+
public:
300+
/**
301+
* @brief Factory function to create channel components.
302+
* @param context A supported context type (either io_context or thread_pool)
303+
* @param capacity Size of the internal buffer on the channel
304+
* @return A pair of Sender and Receiver
305+
*/
306+
static std::pair<Sender, Receiver>
307+
create(auto&& context, std::size_t capacity)
308+
{
309+
#ifdef __clang__
310+
static_assert(
311+
util::detail::ChannelInstantiated<T>::value,
312+
"When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
313+
"to one .cpp file. See documentation at the bottom of Channel.hpp for details."
314+
);
315+
#endif
316+
auto shared = std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
317+
auto sender = Sender{shared};
318+
auto receiver = Receiver{std::move(shared)};
319+
320+
return {std::move(sender), std::move(receiver)};
321+
}
322+
};
323+
324+
} // namespace util
325+
326+
// ================================================================================================
327+
// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
328+
// ================================================================================================
329+
//
330+
// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
331+
// to ONE .cpp file that uses Channel<T>:
332+
//
333+
// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
334+
//
335+
// Example:
336+
// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
337+
// #include "util/Channel.hpp"
338+
// INSTANTIATE_CHANNEL_FOR_CLANG(int)
339+
//
340+
// Why this is needed:
341+
// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
342+
// pending async operations. When using cancellation signals (which we do in our workaround),
343+
// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
344+
// doesn't provide the definitions, causing linker errors:
345+
//
346+
// Undefined symbols for architecture arm64:
347+
// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
348+
// "boost::asio::detail::cancellation_handler<...>::destroy()"
349+
//
350+
// This macro explicitly instantiates the required template specializations.
351+
//
352+
// See: https://github.com/chriskohlhoff/asio/issues/1575
353+
//
354+
#ifdef __clang__
355+
356+
#include <boost/asio/cancellation_signal.hpp>
357+
#include <boost/asio/experimental/channel_traits.hpp>
358+
#include <boost/asio/experimental/detail/channel_service.hpp>
359+
360+
namespace util::detail {
361+
// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
362+
template <typename T>
363+
struct ChannelInstantiated : std::false_type {};
364+
} // namespace util::detail
365+
366+
#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
367+
/* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
368+
template class boost::asio::detail::cancellation_handler< \
369+
boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
370+
op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
371+
namespace util::detail { \
372+
template <> \
373+
struct ChannelInstantiated<T> : std::true_type {}; \
374+
}
375+
376+
#else
377+
378+
// No workaround needed for non-Clang compilers
379+
#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
380+
381+
#endif

tests/unit/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ target_sources(
167167
util/AccountUtilsTests.cpp
168168
util/AssertTests.cpp
169169
util/BytesConverterTests.cpp
170+
util/ChannelTests.cpp
170171
util/CoroutineTest.cpp
171172
util/MoveTrackerTests.cpp
172173
util/ObservableValueTest.cpp

0 commit comments

Comments
 (0)