Skip to content

Commit 6d79dd6

Browse files
authored
feat: Async framework submit on strand/ctx (#2751)
1 parent d6ab2cc commit 6d79dd6

File tree

14 files changed

+296
-110
lines changed

14 files changed

+296
-110
lines changed

src/util/Assert.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "util/SourceLocation.hpp"
2323

2424
#include <boost/log/core/core.hpp>
25+
#include <fmt/base.h>
2526

2627
#include <functional>
2728
#include <string_view>

src/util/async/AnyExecutionContext.hpp

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ class AnyExecutionContext {
8585
[[nodiscard]] auto
8686
execute(SomeHandlerWithoutStopToken auto&& fn)
8787
{
88-
using RetType = std::decay_t<decltype(fn())>;
88+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
8989
static_assert(not std::is_same_v<RetType, std::any>);
9090

91-
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> std::any {
91+
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
9292
if constexpr (std::is_void_v<RetType>) {
93-
fn();
93+
std::invoke(std::forward<decltype(fn)>(fn));
9494
return {};
9595
} else {
96-
return std::make_any<RetType>(fn());
96+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn)));
9797
}
9898
}));
9999
}
@@ -109,17 +109,19 @@ class AnyExecutionContext {
109109
[[nodiscard]] auto
110110
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
111111
{
112-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
112+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
113113
static_assert(not std::is_same_v<RetType, std::any>);
114114

115-
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
116-
if constexpr (std::is_void_v<RetType>) {
117-
fn(std::move(stopToken));
118-
return {};
119-
} else {
120-
return std::make_any<RetType>(fn(std::move(stopToken)));
121-
}
122-
}));
115+
return AnyOperation<RetType>(
116+
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
117+
if constexpr (std::is_void_v<RetType>) {
118+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
119+
return {};
120+
} else {
121+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
122+
}
123+
})
124+
);
123125
}
124126

125127
/**
@@ -134,16 +136,16 @@ class AnyExecutionContext {
134136
[[nodiscard]] auto
135137
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
136138
{
137-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
139+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
138140
static_assert(not std::is_same_v<RetType, std::any>);
139141

140142
return AnyOperation<RetType>(pimpl_->execute(
141-
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
143+
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
142144
if constexpr (std::is_void_v<RetType>) {
143-
fn(std::move(stopToken));
145+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
144146
return {};
145147
} else {
146-
return std::make_any<RetType>(fn(std::move(stopToken)));
148+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
147149
}
148150
},
149151
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
@@ -162,17 +164,17 @@ class AnyExecutionContext {
162164
[[nodiscard]] auto
163165
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken> auto&& fn)
164166
{
165-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
167+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
166168
static_assert(not std::is_same_v<RetType, std::any>);
167169

168170
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
169171
return AnyOperation<RetType>(
170-
pimpl_->scheduleAfter(millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
172+
pimpl_->scheduleAfter(millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
171173
if constexpr (std::is_void_v<RetType>) {
172-
fn(std::move(stopToken));
174+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
173175
return {};
174176
} else {
175-
return std::make_any<RetType>(fn(std::move(stopToken)));
177+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
176178
}
177179
})
178180
);
@@ -191,17 +193,19 @@ class AnyExecutionContext {
191193
[[nodiscard]] auto
192194
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken, bool> auto&& fn)
193195
{
194-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>(), true))>;
196+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken, bool>>;
195197
static_assert(not std::is_same_v<RetType, std::any>);
196198

197199
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
198200
return AnyOperation<RetType>(pimpl_->scheduleAfter(
199-
millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) -> std::any {
201+
millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) mutable -> std::any {
200202
if constexpr (std::is_void_v<RetType>) {
201-
fn(std::move(stopToken), cancelled);
203+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), cancelled);
202204
return {};
203205
} else {
204-
return std::make_any<RetType>(fn(std::move(stopToken), cancelled));
206+
return std::make_any<RetType>(
207+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), cancelled)
208+
);
205209
}
206210
}
207211
));
@@ -217,18 +221,30 @@ class AnyExecutionContext {
217221
[[nodiscard]] auto
218222
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
219223
{
220-
using RetType = std::decay_t<decltype(fn())>;
224+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
221225
static_assert(not std::is_same_v<RetType, std::any>);
222226

223227
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
224228
return AnyOperation<RetType>( //
225-
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] -> std::any {
226-
fn();
229+
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
230+
std::invoke(std::forward<decltype(fn)>(fn));
227231
return {};
228232
})
229233
);
230234
}
231235

236+
/**
237+
* @brief Schedule an operation on the execution context without expectations of a result
238+
* @note Exceptions are caught internally and `ASSERT`ed on
239+
*
240+
* @param fn The block of code to execute
241+
*/
242+
void
243+
submit(SomeHandlerWithoutStopToken auto&& fn)
244+
{
245+
pimpl_->submit(std::forward<decltype(fn)>(fn));
246+
}
247+
232248
/**
233249
* @brief Make a strand for this execution context
234250
*
@@ -276,6 +292,7 @@ class AnyExecutionContext {
276292
virtual impl::ErasedOperation
277293
scheduleAfter(std::chrono::milliseconds, std::function<std::any(AnyStopToken, bool)>) = 0;
278294
virtual impl::ErasedOperation executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
295+
virtual void submit(std::function<void()>) = 0;
279296
virtual AnyStrand
280297
makeStrand() = 0;
281298
virtual void
@@ -323,6 +340,12 @@ class AnyExecutionContext {
323340
return ctx.executeRepeatedly(interval, std::move(fn));
324341
}
325342

343+
void
344+
submit(std::function<void()> fn) override
345+
{
346+
return ctx.submit(std::move(fn));
347+
}
348+
326349
AnyStrand
327350
makeStrand() override
328351
{

src/util/async/AnyStrand.hpp

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ class AnyStrand {
6464
[[nodiscard]] auto
6565
execute(SomeHandlerWithoutStopToken auto&& fn)
6666
{
67-
using RetType = std::decay_t<decltype(fn())>;
67+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
6868
static_assert(not std::is_same_v<RetType, std::any>);
6969

7070
return AnyOperation<RetType>( //
71-
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> std::any {
71+
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
7272
if constexpr (std::is_void_v<RetType>) {
73-
fn();
73+
std::invoke(std::forward<decltype(fn)>(fn));
7474
return {};
7575
} else {
76-
return std::make_any<RetType>(fn());
76+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn)));
7777
}
7878
})
7979
);
@@ -88,16 +88,16 @@ class AnyStrand {
8888
[[nodiscard]] auto
8989
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
9090
{
91-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
91+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
9292
static_assert(not std::is_same_v<RetType, std::any>);
9393

9494
return AnyOperation<RetType>( //
95-
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
95+
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
9696
if constexpr (std::is_void_v<RetType>) {
97-
fn(std::move(stopToken));
97+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
9898
return {};
9999
} else {
100-
return std::make_any<RetType>(fn(std::move(stopToken)));
100+
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
101101
}
102102
})
103103
);
@@ -113,17 +113,19 @@ class AnyStrand {
113113
[[nodiscard]] auto
114114
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
115115
{
116-
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
116+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
117117
static_assert(not std::is_same_v<RetType, std::any>);
118118

119119
return AnyOperation<RetType>( //
120120
pimpl_->execute(
121-
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
121+
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
122122
if constexpr (std::is_void_v<RetType>) {
123-
fn(std::move(stopToken));
123+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
124124
return {};
125125
} else {
126-
return std::make_any<RetType>(fn(std::move(stopToken)));
126+
return std::make_any<RetType>(
127+
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken))
128+
);
127129
}
128130
},
129131
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
@@ -141,18 +143,30 @@ class AnyStrand {
141143
[[nodiscard]] auto
142144
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
143145
{
144-
using RetType = std::decay_t<decltype(fn())>;
146+
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
145147
static_assert(not std::is_same_v<RetType, std::any>);
146148

147149
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
148150
return AnyOperation<RetType>( //
149-
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] -> std::any {
150-
fn();
151+
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
152+
std::invoke(std::forward<decltype(fn)>(fn));
151153
return {};
152154
})
153155
);
154156
}
155157

158+
/**
159+
* @brief Schedule an operation on the execution context without expectations of a result
160+
* @note Exceptions are caught internally and `ASSERT`ed on
161+
*
162+
* @param fn The block of code to execute
163+
*/
164+
void
165+
submit(SomeHandlerWithoutStopToken auto&& fn)
166+
{
167+
pimpl_->submit(std::forward<decltype(fn)>(fn));
168+
}
169+
156170
private:
157171
struct Concept {
158172
virtual ~Concept() = default;
@@ -165,6 +179,7 @@ class AnyStrand {
165179
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) = 0;
166180
[[nodiscard]] virtual impl::ErasedOperation
167181
executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
182+
virtual void submit(std::function<void()>) = 0;
168183
};
169184

170185
template <typename StrandType>
@@ -194,6 +209,12 @@ class AnyStrand {
194209
{
195210
return strand.executeRepeatedly(interval, std::move(fn));
196211
}
212+
213+
void
214+
submit(std::function<void()> fn) override
215+
{
216+
return strand.submit(std::move(fn));
217+
}
197218
};
198219

199220
private:

src/util/async/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,14 @@ Scheduled operations can be aborted by calling
9191

9292
### Error handling
9393

94-
By default, exceptions that happen during the execution of user-provided code are caught and returned in the error channel of `std::expected` as an instance of the `ExecutionError` struct. The user can then extract the error message by calling `what()` or directly accessing the `message` member.
94+
For APIs that return an Operation, by default, exceptions that happen during the execution of user-provided code are caught and returned in the error channel of `std::expected` as an instance of the `ExecutionError` struct. The user can then extract the error message by calling `what()` or directly accessing the `message` member.
95+
In the `submit` API however, exceptions are caught and `ASSERT`ed on.
9596

9697
### Returned value
9798

98-
If the user-provided lambda returns anything but `void`, the type and value will propagate through the operation object and can be received by calling `get` which will block until a value or an error is available.
99+
For `submit` API the return type is always `void`.
100+
101+
For other APIs, if the user-provided lambda returns anything but `void`, the type and value will propagate through the operation object and can be received by calling `get` which will block until a value or an error is available.
99102

100103
The `wait` member function can be used when the user just wants to wait for the value to become available but not necessarily getting at the value just yet.
101104

@@ -122,6 +125,12 @@ This section provides some examples. For more examples take a look at `Execution
122125

123126
### Regular operation
124127

128+
#### One shot tasks
129+
130+
```cpp
131+
ctx.submit([]() { /* do something */ });
132+
```
133+
125134
#### Awaiting and reading values
126135
127136
```cpp

0 commit comments

Comments
 (0)