Skip to content

Commit

Permalink
add concept coke::IsCokeAwaitable, remove coke::GenericAwaiter<T>
Browse files Browse the repository at this point in the history
The main purpose is to report compilation errors when other coroutines
are mixed with coke.

Since all awaitable objects must be movable, coke::GenericAwaiter<T>
that does not meet this condition has been removed, users should use
coke::BasicAwaiter<T> directly.
  • Loading branch information
kedixa committed Oct 19, 2024
1 parent dd2954f commit d510742
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 277 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ cc_library(
"include/coke/deque.h",
"include/coke/fileio.h",
"include/coke/future.h",
"include/coke/generic_awaiter.h",
"include/coke/global.h",
"include/coke/go.h",
"include/coke/latch.h",
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cc_library(
hdrs = [
"bench_common.h",
],
deps = ["//:tools"],
deps = ["//:tools", "//:common"],
)

create_benchmark_target("bench_exception")
Expand Down
14 changes: 14 additions & 0 deletions benchmark/bench_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
#include <vector>

#include "coke/tools/option_parser.h"
#include "coke/basic_awaiter.h"
#include "workflow/WFTaskFactory.h"

class RepeaterAwaiter : public coke::BasicAwaiter<void> {
public:
RepeaterAwaiter(WFRepeaterTask *task) {
task->set_callback([info = this->get_info()](void *) {
auto *awaiter = info->get_awaiter<RepeaterAwaiter>();
awaiter->done();
});

this->set_task(task);
}
};

inline long long current_msec() {
auto dur = std::chrono::steady_clock::now().time_since_epoch();
Expand Down
8 changes: 2 additions & 6 deletions benchmark/bench_go.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ coke::Task<> bench_wf_go_name(int max) {
return nullptr;
};

coke::GenericAwaiter<void> g;
WFRepeaterTask *task = WFTaskFactory::create_repeater_task(creater,
[&g](WFRepeaterTask *) { g.done(); }
);
g.take_over(task);
co_await g;
auto *rep = WFTaskFactory::create_repeater_task(creater, nullptr);
co_await RepeaterAwaiter(rep);
}

coke::Task<> bench_go_name(int max) {
Expand Down
8 changes: 2 additions & 6 deletions benchmark/bench_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,8 @@ coke::Task<> do_bench_wf(Creater &&creater) {
return nullptr;
};

coke::GenericAwaiter<void> g;
auto callback = [&g] (WFRepeaterTask *) { g.done(); };
auto *rep = WFTaskFactory::create_repeater_task(create, callback);
g.take_over(rep);

co_await g;
auto *rep = WFTaskFactory::create_repeater_task(create, nullptr);
co_await RepeaterAwaiter(rep);
}

// benchmarks
Expand Down
19 changes: 8 additions & 11 deletions benchmark/bench_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ bool next(long long &cur) {

template<typename Awaiter>
coke::Task<> detach(Awaiter awaiter) {
co_await awaiter;
co_await std::move(awaiter);
}

template<typename Awaiter>
Expand All @@ -72,20 +72,16 @@ coke::Task<> bench_wf_repeat() {
std::mt19937_64 mt(current_msec());
long long i;

coke::GenericAwaiter<void> g;
auto create = [&] (WFRepeaterTask *) -> SubTask * {
if (next(i)) {
int nsec = dist(mt) * 1000;
return WFTaskFactory::create_timer_task(0, nsec, nullptr);
}
return nullptr;
};
auto callback = [&] (WFRepeaterTask *) { g.done(); };

auto *rep = WFTaskFactory::create_repeater_task(create, callback);
g.take_over(rep);

co_await g;
auto *rep = WFTaskFactory::create_repeater_task(create, nullptr);
co_await RepeaterAwaiter(rep);
}

coke::Task<> bench_default_timer() {
Expand Down Expand Up @@ -136,7 +132,7 @@ coke::Task<> bench_cancel_by_name() {
name = std::to_string(i);
auto awaiter = coke::sleep(name, microseconds(dist(mt)));
coke::cancel_sleep_by_name(name);
co_await awaiter;
co_await std::move(awaiter);
}
}

Expand Down Expand Up @@ -241,7 +237,7 @@ coke::Task<> bench_cancel_by_id() {
id = coke::get_unique_id();
auto awaiter = coke::sleep(id, microseconds(dist(mt)));
coke::cancel_sleep_by_id(id);
co_await awaiter;
co_await std::move(awaiter);
}
}

Expand Down Expand Up @@ -455,9 +451,10 @@ int main(int argc, char *argv[]) {
DO_BENCHMARK(timer_by_addr);
DO_BENCHMARK(cancel_by_id);
DO_BENCHMARK(detach_by_id);
DO_BENCHMARK(detach3_by_id);
// disable this test case, it always make workflow deadlock
//DO_BENCHMARK(detach3_by_id);
DO_BENCHMARK(detach_inf_by_id);
DO_BENCHMARK(detach3_inf_by_id);
//DO_BENCHMARK(detach3_inf_by_id);
DO_BENCHMARK(one_id);
DO_BENCHMARK(two_id);
DO_BENCHMARK(ten_id);
Expand Down
1 change: 0 additions & 1 deletion include/coke/coke.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "coke/sleep.h"
#include "coke/qps_pool.h"
#include "coke/wait.h"
#include "coke/generic_awaiter.h"
#include "coke/series.h"
#include "coke/semaphore.h"
#include "coke/task.h"
Expand Down
6 changes: 3 additions & 3 deletions include/coke/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DagContextBase {
DagContextBase(const DagContextBase &) = delete;
~DagContextBase() = default;

auto operator co_await() { return lt.wait(); }
auto wait() { return lt.wait(); }

void count_down() { lt.count_down(); }

Expand Down Expand Up @@ -259,7 +259,7 @@ class DagGraph : public DagGraphBase<T> {
Task<> run(T &data) {
detail::DagContext<T> ctx(Base::nodes.size(), data);
Base::start(ctx);
co_await ctx;
co_await ctx.wait();
}

private:
Expand All @@ -284,7 +284,7 @@ class DagGraph<void> : public DagGraphBase<void> {
Task<> run() {
detail::DagContext<void> ctx(Base::nodes.size());
Base::start(ctx);
co_await ctx;
co_await ctx.wait();
}

private:
Expand Down
2 changes: 2 additions & 0 deletions include/coke/detail/awaiter_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class AwaiterBase {
static void *create_series(SubTask *first);

public:
constexpr static bool __is_coke_awaitable_type = true;

AwaiterBase() = default;

/**
Expand Down
6 changes: 6 additions & 0 deletions include/coke/detail/basic_concept.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ concept Queueable = (
template<typename T>
concept IsCokePromise = T::__is_coke_promise_type;

template<typename T>
constexpr bool is_coke_awaitable_v = false;

template<typename T>
concept IsCokeAwaitable = T::__is_coke_awaitable_type || is_coke_awaitable_v<T>;

} // namespace coke

#endif // COKE_DETAIL_CONCEPT_H
6 changes: 3 additions & 3 deletions include/coke/detail/future_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ struct FutureStateBase {
co_return st;

lk.unlock();
int ret = co_await s;
int ret = co_await std::move(s);
lk.lock();

st = get_state();
Expand Down Expand Up @@ -212,11 +212,11 @@ template<Cokeable T>
Task<void> detach_task(coke::Promise<T> promise, Task<T> task) {
try {
if constexpr (std::is_void_v<T>) {
co_await task;
co_await std::move(task);
promise.set_value();
}
else {
promise.set_value(co_await task);
promise.set_value(co_await std::move(task));
}
}
catch (...) {
Expand Down
14 changes: 14 additions & 0 deletions include/coke/detail/task_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ class PromiseBase {
std::rethrow_exception(std::move(eptr));
}

template<typename T>
requires IsCokeAwaitable<std::decay_t<T>>
auto await_transform(T &&t) const {
static_assert(std::is_rvalue_reference_v<T&&>, "Awaitable must be rvalue reference");
return std::forward<T>(t);
}

template<typename T>
requires (!IsCokeAwaitable<std::decay_t<T>>)
auto await_transform(T &&) const {
static_assert(!std::is_same_v<T, T>, "This type cannot be co awaited in coke::Task");
}

protected:
std::exception_ptr eptr{nullptr};

Expand Down Expand Up @@ -151,6 +164,7 @@ class [[nodiscard]] Task {
public:
using promise_type = CoPromise<T>;
using handle_type = std::coroutine_handle<promise_type>;
constexpr static bool __is_coke_awaitable_type = true;

Task() { }
Task(Task &&that) : hdl(that.hdl) { that.hdl = nullptr; }
Expand Down
12 changes: 6 additions & 6 deletions include/coke/detail/wait_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ struct MValueHelper<void> {
template<Cokeable T, typename L>
Task<> coke_wait_helper(Task<T> task, ValueHelper<T> &v, L &lt) {
if constexpr (std::is_same_v<T, void>)
co_await task;
co_await std::move(task);
else
v.set_value(co_await task);
v.set_value(co_await std::move(task));

lt.count_down();
}
Expand All @@ -116,9 +116,9 @@ template<Cokeable T, typename L>
Task<> coke_wait_helper(Task<T> task, MValueHelper<T> &v, std::size_t i, L &lt)
{
if constexpr (std::is_same_v<T, void>)
co_await task;
co_await std::move(task);
else
v.set_value(i, co_await task);
v.set_value(i, co_await std::move(task));

lt.count_down();
}
Expand All @@ -134,7 +134,7 @@ auto async_wait_helper(std::vector<Task<T>> tasks)
for (std::size_t i = 0; i < n; i++)
coke_wait_helper(std::move(tasks[i]), v, i, lt).detach();

co_await lt;
co_await lt.wait();
co_return v.get_value();
}

Expand All @@ -151,7 +151,7 @@ using AwaiterResult = std::remove_cvref_t<

template<AwaitableType A>
auto make_task_from_awaitable_helper(A a) -> Task<AwaiterResult<A>> {
co_return co_await a;
co_return co_await std::move(a);
}

} // namespace coke::detail
Expand Down
Loading

0 comments on commit d510742

Please sign in to comment.