From d51074271e476db2eaf15e844c5669744129d4cf Mon Sep 17 00:00:00 2001 From: kedixa <1204837541@qq.com> Date: Sat, 19 Oct 2024 18:47:38 +0800 Subject: [PATCH] add concept coke::IsCokeAwaitable, remove coke::GenericAwaiter The main purpose is to report compilation errors when other coroutines are mixed with coke. Since all awaitable objects must be movable, coke::GenericAwaiter that does not meet this condition has been removed, users should use coke::BasicAwaiter directly. --- BUILD | 1 - benchmark/BUILD | 2 +- benchmark/bench_common.h | 14 +++ benchmark/bench_go.cpp | 8 +- benchmark/bench_graph.cpp | 8 +- benchmark/bench_timer.cpp | 19 ++-- include/coke/coke.h | 1 - include/coke/dag.h | 6 +- include/coke/detail/awaiter_base.h | 2 + include/coke/detail/basic_concept.h | 6 ++ include/coke/detail/future_base.h | 6 +- include/coke/detail/task_impl.h | 14 +++ include/coke/detail/wait_helper.h | 12 +-- include/coke/generic_awaiter.h | 153 ---------------------------- include/coke/latch.h | 7 -- src/condition.cpp | 4 +- src/mutex.cpp | 6 +- src/stop_token.cpp | 4 +- test/BUILD | 1 - test/CMakeLists.txt | 1 - test/test_exception.cpp | 4 +- test/test_generic_awaiter.cpp | 63 ------------ test/test_latch.cpp | 6 +- test/test_sleep.cpp | 4 +- 24 files changed, 75 insertions(+), 277 deletions(-) delete mode 100644 include/coke/generic_awaiter.h delete mode 100644 test/test_generic_awaiter.cpp diff --git a/BUILD b/BUILD index 51767b6..a46dcb6 100644 --- a/BUILD +++ b/BUILD @@ -41,7 +41,6 @@ cc_library( "include/coke/deque.h", "include/coke/fileio.h", "include/coke/future.h", - "include/coke/generic_awaiter.h", "include/coke/global.h", "include/coke/go.h", "include/coke/latch.h", diff --git a/benchmark/BUILD b/benchmark/BUILD index 2148a4d..4af612d 100644 --- a/benchmark/BUILD +++ b/benchmark/BUILD @@ -5,7 +5,7 @@ cc_library( hdrs = [ "bench_common.h", ], - deps = ["//:tools"], + deps = ["//:tools", "//:common"], ) create_benchmark_target("bench_exception") diff --git a/benchmark/bench_common.h b/benchmark/bench_common.h index b018a6c..603134a 100644 --- a/benchmark/bench_common.h +++ b/benchmark/bench_common.h @@ -26,6 +26,20 @@ #include #include "coke/tools/option_parser.h" +#include "coke/basic_awaiter.h" +#include "workflow/WFTaskFactory.h" + +class RepeaterAwaiter : public coke::BasicAwaiter { +public: + RepeaterAwaiter(WFRepeaterTask *task) { + task->set_callback([info = this->get_info()](void *) { + auto *awaiter = info->get_awaiter(); + awaiter->done(); + }); + + this->set_task(task); + } +}; inline long long current_msec() { auto dur = std::chrono::steady_clock::now().time_since_epoch(); diff --git a/benchmark/bench_go.cpp b/benchmark/bench_go.cpp index cba66d2..2ea4fa1 100644 --- a/benchmark/bench_go.cpp +++ b/benchmark/bench_go.cpp @@ -64,12 +64,8 @@ coke::Task<> bench_wf_go_name(int max) { return nullptr; }; - coke::GenericAwaiter g; - WFRepeaterTask *task = WFTaskFactory::create_repeater_task(creater, - [&g](WFRepeaterTask *) { g.done(); } - ); - g.take_over(task); - co_await g; + auto *rep = WFTaskFactory::create_repeater_task(creater, nullptr); + co_await RepeaterAwaiter(rep); } coke::Task<> bench_go_name(int max) { diff --git a/benchmark/bench_graph.cpp b/benchmark/bench_graph.cpp index 8ab5c9f..65ffc88 100644 --- a/benchmark/bench_graph.cpp +++ b/benchmark/bench_graph.cpp @@ -236,12 +236,8 @@ coke::Task<> do_bench_wf(Creater &&creater) { return nullptr; }; - coke::GenericAwaiter g; - auto callback = [&g] (WFRepeaterTask *) { g.done(); }; - auto *rep = WFTaskFactory::create_repeater_task(create, callback); - g.take_over(rep); - - co_await g; + auto *rep = WFTaskFactory::create_repeater_task(create, nullptr); + co_await RepeaterAwaiter(rep); } // benchmarks diff --git a/benchmark/bench_timer.cpp b/benchmark/bench_timer.cpp index 240736d..e491452 100644 --- a/benchmark/bench_timer.cpp +++ b/benchmark/bench_timer.cpp @@ -58,7 +58,7 @@ bool next(long long &cur) { template coke::Task<> detach(Awaiter awaiter) { - co_await awaiter; + co_await std::move(awaiter); } template @@ -72,7 +72,6 @@ coke::Task<> bench_wf_repeat() { std::mt19937_64 mt(current_msec()); long long i; - coke::GenericAwaiter g; auto create = [&] (WFRepeaterTask *) -> SubTask * { if (next(i)) { int nsec = dist(mt) * 1000; @@ -80,12 +79,9 @@ coke::Task<> bench_wf_repeat() { } return nullptr; }; - auto callback = [&] (WFRepeaterTask *) { g.done(); }; - auto *rep = WFTaskFactory::create_repeater_task(create, callback); - g.take_over(rep); - - co_await g; + auto *rep = WFTaskFactory::create_repeater_task(create, nullptr); + co_await RepeaterAwaiter(rep); } coke::Task<> bench_default_timer() { @@ -136,7 +132,7 @@ coke::Task<> bench_cancel_by_name() { name = std::to_string(i); auto awaiter = coke::sleep(name, microseconds(dist(mt))); coke::cancel_sleep_by_name(name); - co_await awaiter; + co_await std::move(awaiter); } } @@ -241,7 +237,7 @@ coke::Task<> bench_cancel_by_id() { id = coke::get_unique_id(); auto awaiter = coke::sleep(id, microseconds(dist(mt))); coke::cancel_sleep_by_id(id); - co_await awaiter; + co_await std::move(awaiter); } } @@ -455,9 +451,10 @@ int main(int argc, char *argv[]) { DO_BENCHMARK(timer_by_addr); DO_BENCHMARK(cancel_by_id); DO_BENCHMARK(detach_by_id); - DO_BENCHMARK(detach3_by_id); + // disable this test case, it always make workflow deadlock + //DO_BENCHMARK(detach3_by_id); DO_BENCHMARK(detach_inf_by_id); - DO_BENCHMARK(detach3_inf_by_id); + //DO_BENCHMARK(detach3_inf_by_id); DO_BENCHMARK(one_id); DO_BENCHMARK(two_id); DO_BENCHMARK(ten_id); diff --git a/include/coke/coke.h b/include/coke/coke.h index 6a3502c..11d0d4b 100644 --- a/include/coke/coke.h +++ b/include/coke/coke.h @@ -35,7 +35,6 @@ #include "coke/sleep.h" #include "coke/qps_pool.h" #include "coke/wait.h" -#include "coke/generic_awaiter.h" #include "coke/series.h" #include "coke/semaphore.h" #include "coke/task.h" diff --git a/include/coke/dag.h b/include/coke/dag.h index a988f9e..0199ff0 100644 --- a/include/coke/dag.h +++ b/include/coke/dag.h @@ -82,7 +82,7 @@ class DagContextBase { DagContextBase(const DagContextBase &) = delete; ~DagContextBase() = default; - auto operator co_await() { return lt.wait(); } + auto wait() { return lt.wait(); } void count_down() { lt.count_down(); } @@ -259,7 +259,7 @@ class DagGraph : public DagGraphBase { Task<> run(T &data) { detail::DagContext ctx(Base::nodes.size(), data); Base::start(ctx); - co_await ctx; + co_await ctx.wait(); } private: @@ -284,7 +284,7 @@ class DagGraph : public DagGraphBase { Task<> run() { detail::DagContext ctx(Base::nodes.size()); Base::start(ctx); - co_await ctx; + co_await ctx.wait(); } private: diff --git a/include/coke/detail/awaiter_base.h b/include/coke/detail/awaiter_base.h index c4e0039..03d33e6 100644 --- a/include/coke/detail/awaiter_base.h +++ b/include/coke/detail/awaiter_base.h @@ -37,6 +37,8 @@ class AwaiterBase { static void *create_series(SubTask *first); public: + constexpr static bool __is_coke_awaitable_type = true; + AwaiterBase() = default; /** diff --git a/include/coke/detail/basic_concept.h b/include/coke/detail/basic_concept.h index b78efa0..3f39746 100644 --- a/include/coke/detail/basic_concept.h +++ b/include/coke/detail/basic_concept.h @@ -58,6 +58,12 @@ concept Queueable = ( template concept IsCokePromise = T::__is_coke_promise_type; +template +constexpr bool is_coke_awaitable_v = false; + +template +concept IsCokeAwaitable = T::__is_coke_awaitable_type || is_coke_awaitable_v; + } // namespace coke #endif // COKE_DETAIL_CONCEPT_H diff --git a/include/coke/detail/future_base.h b/include/coke/detail/future_base.h index 121b5ae..93ee3c0 100644 --- a/include/coke/detail/future_base.h +++ b/include/coke/detail/future_base.h @@ -142,7 +142,7 @@ struct FutureStateBase { co_return st; lk.unlock(); - int ret = co_await s; + int ret = co_await std::move(s); lk.lock(); st = get_state(); @@ -212,11 +212,11 @@ template Task detach_task(coke::Promise promise, Task task) { try { if constexpr (std::is_void_v) { - co_await task; + co_await std::move(task); promise.set_value(); } else { - promise.set_value(co_await task); + promise.set_value(co_await std::move(task)); } } catch (...) { diff --git a/include/coke/detail/task_impl.h b/include/coke/detail/task_impl.h index 8726072..7f18f2c 100644 --- a/include/coke/detail/task_impl.h +++ b/include/coke/detail/task_impl.h @@ -71,6 +71,19 @@ class PromiseBase { std::rethrow_exception(std::move(eptr)); } + template + requires IsCokeAwaitable> + auto await_transform(T &&t) const { + static_assert(std::is_rvalue_reference_v, "Awaitable must be rvalue reference"); + return std::forward(t); + } + + template + requires (!IsCokeAwaitable>) + auto await_transform(T &&) const { + static_assert(!std::is_same_v, "This type cannot be co awaited in coke::Task"); + } + protected: std::exception_ptr eptr{nullptr}; @@ -151,6 +164,7 @@ class [[nodiscard]] Task { public: using promise_type = CoPromise; using handle_type = std::coroutine_handle; + constexpr static bool __is_coke_awaitable_type = true; Task() { } Task(Task &&that) : hdl(that.hdl) { that.hdl = nullptr; } diff --git a/include/coke/detail/wait_helper.h b/include/coke/detail/wait_helper.h index ea081ea..22739b6 100644 --- a/include/coke/detail/wait_helper.h +++ b/include/coke/detail/wait_helper.h @@ -105,9 +105,9 @@ struct MValueHelper { template Task<> coke_wait_helper(Task task, ValueHelper &v, L <) { if constexpr (std::is_same_v) - co_await task; + co_await std::move(task); else - v.set_value(co_await task); + v.set_value(co_await std::move(task)); lt.count_down(); } @@ -116,9 +116,9 @@ template Task<> coke_wait_helper(Task task, MValueHelper &v, std::size_t i, L <) { if constexpr (std::is_same_v) - co_await task; + co_await std::move(task); else - v.set_value(i, co_await task); + v.set_value(i, co_await std::move(task)); lt.count_down(); } @@ -134,7 +134,7 @@ auto async_wait_helper(std::vector> tasks) for (std::size_t i = 0; i < n; i++) coke_wait_helper(std::move(tasks[i]), v, i, lt).detach(); - co_await lt; + co_await lt.wait(); co_return v.get_value(); } @@ -151,7 +151,7 @@ using AwaiterResult = std::remove_cvref_t< template auto make_task_from_awaitable_helper(A a) -> Task> { - co_return co_await a; + co_return co_await std::move(a); } } // namespace coke::detail diff --git a/include/coke/generic_awaiter.h b/include/coke/generic_awaiter.h deleted file mode 100644 index acecab6..0000000 --- a/include/coke/generic_awaiter.h +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Copyright 2024 Coke Project (https://github.com/kedixa/coke) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Authors: kedixa (https://github.com/kedixa) -*/ - -#ifndef COKE_GENERIC_AWAITER_H -#define COKE_GENERIC_AWAITER_H - -#include -#include - -#include "coke/detail/awaiter_base.h" - -namespace coke { - -/** - * GenericAwaiter is used with task types under workflow framework but not - * currently supported by coke. For example: - * - * class MyTask : public SubTask { - * public: - * void set_callback(std::function); - * // ... other functions - * }; - * - * Then we can use GenericAwaiter like this: - * coke::Task<> f() { - * GenericAwaiter awaiter; - * - * MyTask *task = create_mytask(some args ...); - * task->set_callback([&awaiter](MyTask *task) { - * // do something and finally call set_result and done - * awaiter.set_result(task->get_int()); - * awaiter.done(); - * }); - * - * awaiter.take_over(task); - * - * int ret = co_await awaiter; - * } -*/ -template -class GenericAwaiter; - -template<> -class GenericAwaiter : public AwaiterBase { -public: - GenericAwaiter() = default; - - /** - * @brief GenericAwaiter can neither be copied nor moved. See BasicAwaiter - * if you need a movable awaiter. - */ - GenericAwaiter(GenericAwaiter &&) = delete; - - void await_resume() { } - - /** - * @brief Resume the coroutine, usually call it on task's callback. - * @attention GenericAwaiter::done should be called EXACTLY ONCE. - */ - void done() noexcept override { - AwaiterBase::done(); - } - - void set_result() { } - void emplace_result() { } - - /** - * @brief Take over ownership of `task`. Do not perform other operations on - * task after calling take_over. - * @attention If continue to operate the task after take_over, make sure - * you understand the life cycle of the workflow task. - */ - void take_over(SubTask *task) { - set_task(task, false); - } -}; - -template -class GenericAwaiter : public AwaiterBase { -public: - GenericAwaiter() = default; - - /** - * @brief GenericAwaiter can neither be copied nor moved. See BasicAwaiter - * if you need a movable awaiter. - */ - GenericAwaiter(GenericAwaiter &&) = delete; - - /** - * @brief Set result into GenericAwaiter. - * @attention Set or emplace result should be called EXACTLY ONCE. - */ - template - requires std::is_constructible_v - void emplace_result(ARGS&&... args) { - opt.emplace(std::forward(args)...); - } - - void set_result(T &&t) { - opt = std::move(t); - } - - void set_result(const T &t) { - opt = t; - } - - /** - * @brief Resume the coroutine, usually call it on task's callback. - * @attention GenericAwaiter::done should be called EXACTLY ONCE. - */ - void done() noexcept override { - AwaiterBase::done(); - } - - /** - * @brief Take over ownership of `task`. Do not perform other operations on - * task after calling take_over. - * @attention If continue to operate the task after take_over, make sure - * you understand the life cycle of the workflow task. - */ - virtual void take_over(SubTask *task) { - set_task(task, false); - } - - /** - * @brief Return the co_await result set by emplace_result/set_result. - */ - T await_resume() { - return std::move(opt.value()); - } - -protected: - std::optional opt; -}; - -} // namespace coke - -#endif // COKE_GENERIC_AWAITER_H diff --git a/include/coke/latch.h b/include/coke/latch.h index 9d608f0..17631d4 100644 --- a/include/coke/latch.h +++ b/include/coke/latch.h @@ -61,13 +61,6 @@ class Latch final { return this->expected >= 0; } - /** - * @brief Same as Latch::wait. - */ - LatchAwaiter operator co_await() { - return wait(); - } - /** * @brief Wait for the Latch to be counted to zero. * @return An awaiter that should be awaited immediately. diff --git a/src/condition.cpp b/src/condition.cpp index 6dc393a..012e966 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -30,7 +30,7 @@ Task Condition::wait_impl(std::unique_lock &lock, ++wait_cnt; lock.unlock(); - ret = co_await s; + ret = co_await std::move(s); lock.lock(); --wait_cnt; @@ -57,7 +57,7 @@ Task Condition::wait_impl(std::unique_lock &lock, insert_head = true; lock.unlock(); - ret = co_await s; + ret = co_await std::move(s); lock.lock(); --wait_cnt; diff --git a/src/mutex.cpp b/src/mutex.cpp index a9ad530..3d79ad8 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -42,7 +42,7 @@ Task Semaphore::acquire_impl(detail::TimedWaitHelper helper) { insert_head = true; lk.unlock(); - ret = co_await s; + ret = co_await std::move(s); lk.lock(); --waiting; @@ -105,7 +105,7 @@ Task SharedMutex::lock_impl(detail::TimedWaitHelper helper) { insert_head = true; lk.unlock(); - ret = co_await s; + ret = co_await std::move(s); lk.lock(); --write_waiting; @@ -143,7 +143,7 @@ Task SharedMutex::lock_shared_impl(detail::TimedWaitHelper helper) { insert_head = true; lk.unlock(); - ret = co_await s; + ret = co_await std::move(s); lk.lock(); --read_waiting; diff --git a/src/stop_token.cpp b/src/stop_token.cpp index c113432..e9e1c4e 100644 --- a/src/stop_token.cpp +++ b/src/stop_token.cpp @@ -26,7 +26,7 @@ Task StopToken::wait_finish_impl(detail::TimedWaitHelper helper) { SleepAwaiter s = sleep(get_finish_addr(), helper); lk.unlock(); - int ret = co_await s; + int ret = co_await std::move(s); lk.lock(); if (ret < 0 || ret == SLEEP_ABORTED) @@ -42,7 +42,7 @@ Task StopToken::wait_stop_impl(detail::TimedWaitHelper helper) { SleepAwaiter s = sleep(get_stop_addr(), helper); lk.unlock(); - int ret = co_await s; + int ret = co_await std::move(s); lk.lock(); if (ret < 0 || ret == SLEEP_ABORTED) diff --git a/test/BUILD b/test/BUILD index 26a4a8c..158db67 100644 --- a/test/BUILD +++ b/test/BUILD @@ -6,7 +6,6 @@ create_test_target("test_dag") create_test_target("test_exception") create_test_target("test_file") create_test_target("test_future") -create_test_target("test_generic_awaiter") create_test_target("test_go") create_test_target("test_http", ["//:http"]) create_test_target("test_latch") diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 094ad1b..4980837 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -12,7 +12,6 @@ set(ALL_TESTS test_exception test_file test_future - test_generic_awaiter test_go test_http test_latch diff --git a/test/test_exception.cpp b/test/test_exception.cpp index 4db4e9f..8ac40fb 100644 --- a/test/test_exception.cpp +++ b/test/test_exception.cpp @@ -32,12 +32,12 @@ coke::Task func(bool e, T x) { } template -coke::Task<> task_exception(bool e, coke::Task &&task, const T &x) { +coke::Task<> task_exception(bool e, coke::Task task, const T &x) { bool has_exception = false; T y; try { - y = co_await task; + y = co_await std::move(task); } catch (const T &exc) { y = exc; diff --git a/test/test_generic_awaiter.cpp b/test/test_generic_awaiter.cpp deleted file mode 100644 index 0675782..0000000 --- a/test/test_generic_awaiter.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright 2024 Coke Project (https://github.com/kedixa/coke) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Authors: kedixa (https://github.com/kedixa) -*/ - -#include -#include -#include - -#include "coke/coke.h" -#include "workflow/Workflow.h" -#include "workflow/WFTaskFactory.h" - -coke::Task<> generic() { - std::atomic count = 0; - auto func = [&count]() { - ++count; - }; - - ParallelWork *p = Workflow::create_parallel_work(nullptr); - for (int i = 0; i < 10; i++) { - auto *t = WFTaskFactory::create_go_task("", func); - auto *s = Workflow::create_series_work(t, nullptr); - p->add_series(s); - } - - coke::GenericAwaiter g; - p->set_callback([&g](const ParallelWork *) { - g.set_result(1234); - g.done(); - }); - - g.take_over(p); - - int ret = co_await g; - EXPECT_EQ(ret, 1234); - EXPECT_EQ(count.load(), 10); -} - -int main(int argc, char *argv[]) { - coke::GlobalSettings s; - s.poller_threads = 2; - s.handler_threads = 2; - s.compute_threads = 2; - coke::library_init(s); - - testing::InitGoogleTest(&argc, argv); - - return RUN_ALL_TESTS(); -} diff --git a/test/test_latch.cpp b/test/test_latch.cpp index a223bb8..a515ad9 100644 --- a/test/test_latch.cpp +++ b/test/test_latch.cpp @@ -34,7 +34,7 @@ coke::Task<> simple_latch() { co_return; }, t, lt).detach(); - co_await lt; + co_await lt.wait(); EXPECT_EQ(s, t); } @@ -50,7 +50,7 @@ coke::Task<> simple_latch() { lt.count_down(); }, t, lt).detach(); - co_await lt; + co_await lt.wait(); EXPECT_EQ(s, t); } } @@ -69,7 +69,7 @@ coke::Task<> ret_value() { ret = co_await lt.wait_for(std::chrono::milliseconds(10)); EXPECT_EQ(ret, coke::LATCH_TIMEOUT); - ret = co_await lt; + ret = co_await lt.wait(); EXPECT_EQ(ret, coke::LATCH_SUCCESS); } diff --git a/test/test_sleep.cpp b/test/test_sleep.cpp index 3e8cccd..9fdac64 100644 --- a/test/test_sleep.cpp +++ b/test/test_sleep.cpp @@ -52,14 +52,14 @@ coke::Task<> cancel_sleep() { uint64_t uid = coke::get_unique_id(); auto awaiter = coke::sleep(uid, std::chrono::milliseconds(0)); coke::cancel_sleep_by_id(uid); - int ret = co_await awaiter; + int ret = co_await std::move(awaiter); EXPECT_EQ(ret, coke::SLEEP_CANCELED); } coke::Task<> success_sleep() { uint64_t uid = coke::get_unique_id(); auto awaiter = coke::sleep(uid, std::chrono::milliseconds(0)); - int ret = co_await awaiter; + int ret = co_await std::move(awaiter); EXPECT_EQ(ret, coke::SLEEP_SUCCESS); }