@@ -229,15 +229,21 @@ struct invoke_variant_dispatcher {
229
229
template <>
230
230
struct invoke_variant_dispatcher <1 > {
231
231
template <typename F, typename T, typename Arg>
232
- static auto invoke_ (F&& f, T& t) {
233
- if constexpr (std::is_same<Arg, void >::value) {
234
- return ;
235
- } else if constexpr (std::is_same<Arg, detail::avoid_>::value) {
236
- return std::forward<F>(f)();
237
- } else {
238
- return std::forward<F>(f)(std::move (stlab::get<Arg>(std::get<0 >(t))));
239
- }
232
+ static auto invoke_ (F&&, T&) -> std::enable_if_t<std::is_same<Arg, void>::value, void> {
233
+ return ;
240
234
}
235
+ template <typename F, typename T, typename Arg>
236
+ static auto invoke_ (F&& f, T&) -> std::enable_if_t<std::is_same<Arg, detail::avoid_>::value,
237
+ decltype(std::forward<F>(f)())> {
238
+ return std::forward<F>(f)();
239
+ }
240
+ template <typename F, typename T, typename Arg>
241
+ static auto invoke_ (F&& f, T& t) -> std::enable_if_t<
242
+ !(std::is_same<Arg, detail::avoid_>::value || std::is_same<Arg, detail::avoid_>::value),
243
+ decltype(std::forward<F>(f)(std::move(stlab::get<Arg>(std::get<0 >(t)))))> {
244
+ return std::forward<F>(f)(std::move (stlab::get<Arg>(std::get<0 >(t))));
245
+ }
246
+
241
247
template <typename F, typename T, typename ... Args>
242
248
static auto invoke (F&& f, T& t) {
243
249
return invoke_<F, T, first_t <Args...>>(std::forward<F>(f), t);
@@ -699,7 +705,7 @@ struct downstream<
699
705
_data = std::forward<F>(f);
700
706
}
701
707
702
- void clear () { _data = nullopt; }
708
+ void clear () { _data = stlab:: nullopt; }
703
709
704
710
std::size_t size () const { return 1 ; }
705
711
@@ -828,7 +834,7 @@ struct shared_process
828
834
if (do_final) {
829
835
std::unique_lock<std::mutex> lock (_downstream_mutex);
830
836
_downstream.clear (); // This will propagate the close to anything downstream
831
- _process = nullopt;
837
+ _process = stlab:: nullopt;
832
838
}
833
839
}
834
840
@@ -910,8 +916,14 @@ struct shared_process
910
916
return bool (message);
911
917
}
912
918
919
+ /*
920
+ REVISIT (sparent) : Next two cases are nearly identical, complicated by the need to
921
+ remove constexpr if to support C++14.
922
+ */
923
+
913
924
template <typename U>
914
- auto step () -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>>> {
925
+ auto step () -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
926
+ !has_process_await_v<unwrap_reference_t<T>, Args...>> {
915
927
// in case that the timeout function is just been executed then we have to re-schedule
916
928
// the current run
917
929
lock_t lock (_timeout_function_control, std::try_to_lock);
@@ -926,12 +938,90 @@ struct shared_process
926
938
is done on yield()
927
939
*/
928
940
try {
929
- if constexpr (has_process_await_v<unwrap_reference_t <T>, Args...>) {
930
- while (get_process_state (_process).first == process_state::await) {
931
- if (!dequeue ()) break ;
932
- }
941
+ if (get_process_state (_process).first == process_state::await) return ;
942
+
943
+ // Workaround until we can use structured bindings
944
+ auto tmp = get_process_state (_process);
945
+ const auto & state = tmp.first ;
946
+ const auto & duration = tmp.second ;
947
+
948
+ /*
949
+ Once we hit yield, go ahead and call it. If the yield is delayed then schedule it.
950
+ This process will be considered running until it executes.
951
+ */
952
+ if (state == process_state::yield) {
953
+ if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
954
+ std::chrono::nanoseconds::min ())
955
+ broadcast (unwrap (*_process).yield ());
956
+ else
957
+ execute_at (duration,
958
+ _executor)([_weak_this = make_weak_ptr (this ->shared_from_this ())] {
959
+ auto _this = _weak_this.lock ();
960
+ if (!_this) return ;
961
+ _this->try_broadcast ();
962
+ });
963
+ }
964
+
965
+ /*
966
+ We are in an await state and the queue is empty.
967
+
968
+ If we await forever then task_done() leaving us in an await state.
969
+ else if we await with an expired timeout then go ahead and yield now.
970
+ else schedule a timeout when we will yield if not canceled by intervening await.
971
+ */
972
+ else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
973
+ std::chrono::nanoseconds::max ()) {
974
+ task_done ();
975
+ } else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
976
+ std::chrono::nanoseconds::min ()) {
977
+ broadcast (unwrap (*_process).yield ());
933
978
} else {
934
- if (get_process_state (_process).first == process_state::await) return ;
979
+ /* Schedule a timeout. */
980
+ _timeout_function_active = true ;
981
+ execute_at (duration,
982
+ _executor)([_weak_this = make_weak_ptr (this ->shared_from_this ())] {
983
+ auto _this = _weak_this.lock ();
984
+ // It may be that the complete channel is gone in the meanwhile
985
+ if (!_this) return ;
986
+
987
+ // try_lock can fail spuriously
988
+ while (true ) {
989
+ lock_t lock (_this->_timeout_function_control , std::try_to_lock);
990
+ if (!lock) continue ;
991
+
992
+ // we were cancelled
993
+ if (get_process_state (_this->_process ).first != process_state::yield) {
994
+ _this->try_broadcast ();
995
+ _this->_timeout_function_active = false ;
996
+ }
997
+ return ;
998
+ }
999
+ });
1000
+ }
1001
+ } catch (...) { // this catches exceptions during _process.await() and _process.yield()
1002
+ broadcast (std::move (std::current_exception ()));
1003
+ }
1004
+ }
1005
+
1006
+ template <typename U>
1007
+ auto step () -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
1008
+ has_process_await_v<unwrap_reference_t<T>, Args...>> {
1009
+ // in case that the timeout function is just been executed then we have to re-schedule
1010
+ // the current run
1011
+ lock_t lock (_timeout_function_control, std::try_to_lock);
1012
+ if (!lock) {
1013
+ run ();
1014
+ return ;
1015
+ }
1016
+ _timeout_function_active = false ;
1017
+
1018
+ /*
1019
+ While we are waiting we will flush the queue. The assumption here is that work
1020
+ is done on yield()
1021
+ */
1022
+ try {
1023
+ while (get_process_state (_process).first == process_state::await) {
1024
+ if (!dequeue ()) break ;
935
1025
}
936
1026
937
1027
// Workaround until we can use structured bindings
@@ -944,7 +1034,8 @@ struct shared_process
944
1034
This process will be considered running until it executes.
945
1035
*/
946
1036
if (state == process_state::yield) {
947
- if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <= std::chrono::nanoseconds::min ())
1037
+ if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1038
+ std::chrono::nanoseconds::min ())
948
1039
broadcast (unwrap (*_process).yield ());
949
1040
else
950
1041
execute_at (duration,
@@ -962,14 +1053,17 @@ struct shared_process
962
1053
else if we await with an expired timeout then go ahead and yield now.
963
1054
else schedule a timeout when we will yield if not canceled by intervening await.
964
1055
*/
965
- else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) == std::chrono::nanoseconds::max ()) {
1056
+ else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
1057
+ std::chrono::nanoseconds::max ()) {
966
1058
task_done ();
967
- } else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <= std::chrono::nanoseconds::min ()) {
1059
+ } else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1060
+ std::chrono::nanoseconds::min ()) {
968
1061
broadcast (unwrap (*_process).yield ());
969
1062
} else {
970
1063
/* Schedule a timeout. */
971
1064
_timeout_function_active = true ;
972
- execute_at (duration, _executor)([_weak_this = make_weak_ptr (this ->shared_from_this ())] {
1065
+ execute_at (duration,
1066
+ _executor)([_weak_this = make_weak_ptr (this ->shared_from_this ())] {
973
1067
auto _this = _weak_this.lock ();
974
1068
// It may be that the complete channel is gone in the meanwhile
975
1069
if (!_this) return ;
@@ -1367,7 +1461,7 @@ detail::annotated_process<F> operator&(detail::annotated_process<F>&& a, buffer_
1367
1461
/* *************************************************************************************************/
1368
1462
1369
1463
template <typename T>
1370
- class [[nodiscard]] receiver {
1464
+ class STLAB_NODISCARD () receiver {
1371
1465
using ptr_t = std::shared_ptr<detail::shared_process_receiver<T>>;
1372
1466
1373
1467
ptr_t _p;
0 commit comments