diff --git a/packages/builtin/runtime_options.pony b/packages/builtin/runtime_options.pony index 1dd803f14b..3a0544348f 100644 --- a/packages/builtin/runtime_options.pony +++ b/packages/builtin/runtime_options.pony @@ -82,6 +82,13 @@ struct RuntimeOptions current value. This is a floating point value. Defaults to 2.0. """ + var ponymsgstilmute: U32 = 1 + """ + Number of messages sent to muted/overloaded/under pressure actors before + a muted sending actor gets unscheduled so it cannot continue to overwhelm + muted/overloaded/under pressure actors. Defaults to 1. + """ + var ponynoyield: Bool = false """ Do not yield the CPU when no work is available. diff --git a/src/libponyrt/actor/actor.c b/src/libponyrt/actor/actor.c index ad73a0e100..b121fa8c35 100644 --- a/src/libponyrt/actor/actor.c +++ b/src/libponyrt/actor/actor.c @@ -28,13 +28,14 @@ pony_static_assert((offsetof(pony_actor_t, gc) + sizeof(gc_t)) == static bool actor_noblock = false; +static uint32_t actor_msgs_til_mute = 0; + enum { FLAG_BLOCKED = 1 << 0, FLAG_BLOCKED_SENT = 1 << 1, FLAG_SYSTEM = 1 << 2, - FLAG_UNSCHEDULED = 1 << 3, - FLAG_CD_CONTACTED = 1 << 4, + FLAG_CD_CONTACTED = 1 << 3, }; enum @@ -43,6 +44,7 @@ enum SYNC_FLAG_OVERLOADED = 1 << 1, SYNC_FLAG_UNDER_PRESSURE = 1 << 2, SYNC_FLAG_MUTED = 1 << 3, + SYNC_FLAG_MUTED_UNSCHEDULED = 1 << 4, }; #ifdef USE_RUNTIMESTATS @@ -126,7 +128,7 @@ static void unset_internal_flag(pony_actor_t* actor, uint8_t flag) } // -// Mute/Unmute/Check mute status functions +// Mute/Unmute/unschedule/triggers muting functions // // For backpressure related muting and unmuting to work correctly, the following // rules have to be maintained. @@ -147,6 +149,11 @@ static void unset_internal_flag(pony_actor_t* actor, uint8_t flag) // // Our handling of atomic operations in `mute_actor` // and `unmute_actor` are to assure that both rules aren't violated. +static void unschedule_muted_actor(pony_actor_t* actor) +{ + set_sync_flag(actor, SYNC_FLAG_MUTED_UNSCHEDULED); + DTRACE1(ACTOR_MUTED_UNCHEDULED, (uintptr_t)actor); +} static void mute_actor(pony_actor_t* actor) { @@ -156,7 +163,7 @@ static void mute_actor(pony_actor_t* actor) void ponyint_unmute_actor(pony_actor_t* actor) { - unset_sync_flag(actor, SYNC_FLAG_MUTED); + unset_sync_flag(actor, SYNC_FLAG_MUTED | SYNC_FLAG_MUTED_UNSCHEDULED); DTRACE1(ACTOR_UNMUTED, (uintptr_t)actor); } @@ -173,15 +180,12 @@ static void actor_setoverloaded(pony_actor_t* actor) DTRACE1(ACTOR_OVERLOADED, (uintptr_t)actor); } -static void actor_unsetoverloaded(pony_actor_t* actor) +static void actor_unsetoverloaded(pony_ctx_t* ctx, pony_actor_t* actor) { - pony_ctx_t* ctx = pony_ctx(); unset_sync_flag(actor, SYNC_FLAG_OVERLOADED); DTRACE1(ACTOR_OVERLOADED_CLEARED, (uintptr_t)actor); if (!has_sync_flag(actor, SYNC_FLAG_UNDER_PRESSURE)) - { ponyint_sched_start_global_unmute(ctx->scheduler->index, actor); - } } static void maybe_mark_should_mute(pony_ctx_t* ctx, pony_actor_t* to) @@ -194,12 +198,21 @@ static void maybe_mark_should_mute(pony_ctx_t* ctx, pony_actor_t* to) // 2. the sender isn't overloaded or under pressure // AND // 3. we are sending to another actor (as compared to sending to self) - if(triggers_muting(to) && - !has_sync_flag_any(ctx->current, SYNC_FLAG_OVERLOADED | - SYNC_FLAG_UNDER_PRESSURE) && - ctx->current != to) + if(ctx->current != to && !has_sync_flag_any(ctx->current, + SYNC_FLAG_OVERLOADED | SYNC_FLAG_UNDER_PRESSURE) && triggers_muting(to)) { - ponyint_sched_mute(ctx, ctx->current, to); + ctx->should_mute_actor = true; + + if(ctx->current->muters == NULL) + { + ctx->current->muters = POOL_ALLOC(muteset_t); + ponyint_muteset_init(ctx->current->muters, 8); + } + + size_t index = HASHMAP_UNKNOWN; + pony_actor_t* r = ponyint_muteset_get(ctx->current->muters, to, &index); + if(r == NULL) + ponyint_muteset_putindex(ctx->current->muters, to, index); } } } @@ -509,58 +522,143 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor, } } -// return true if mute occurs -static bool maybe_should_mute(pony_actor_t* actor) +// return true if the actor should be unscheduled +static bool maybe_should_unschedule_actor(pony_ctx_t* ctx, pony_actor_t* actor) { - // if we become muted as a result of handling a message, bail out now. - // we aren't set to "muted" at this point. setting to muted during a - // a behavior can lead to race conditions that might result in a - // deadlock. - // Given that actor's are not run when they are muted, then when we - // started out batch, actor->muted would have been 0. If any of our - // message sends would result in the actor being muted, that value will - // have changed to greater than 0. + // we should mute as a result of handling a message and we maybe should + // also be unscheduled if we've hit the `actor_msgs_til_mute` threshold. // - // We will then set the actor to "muted". Once set, any actor sending - // a message to it will be also be muted unless said sender is marked - // as overloaded. + // We will set the actor to "muted" if it is not already set as muted. + // Once set, any actor sending a message to it will be also be muted unless + // said sender is marked as overloaded or under pressure. // // The key points here is that: - // 1. We can't set the actor to "muted" until after its finished running - // a behavior. - // 2. We should bail out from running the actor and return false so that - // it won't be rescheduled. - if(actor->muted > 0) - { + // 1. We should bail out from running the actor and return false so that + // it won't be rescheduled only if we've hit the threshold. + // 2. If we return `true`, we will stop processing message but will get + // rescheduled again later on. + if(!has_sync_flag(actor, SYNC_FLAG_MUTED)) mute_actor(actor); - return true; + + actor->muted++; + + // check if we passed the threshold and should be unscheduled + if(actor->muted >= actor_msgs_til_mute) + { + size_t size = ponyint_muteset_size(actor->muters); + + // set muted count to 0; it will get incremented again by + // `ponyint_sched_mute` for tracking when to reschedule actor + actor->muted = 0; + + // ask muters to unmute us before we get unscheduled or else + // we will never get rescheduled again + pony_assert(size > 0); + (void)size; + size_t i = HASHMAP_BEGIN; + + pony_actor_t* muter; + + while((muter = ponyint_muteset_next(actor->muters, &i)) != NULL) + { + if(triggers_muting(muter)) + ponyint_sched_mute(ctx, actor, muter); + + ponyint_muteset_clearindex(actor->muters, i); + } + + ponyint_muteset_optimize(actor->muters); + + if(actor->muted > 0) + { + unschedule_muted_actor(actor); + return true; + } } + // don't unschedule return false; } -static bool batch_limit_reached(pony_actor_t* actor, bool polling) +static void clear_muteset_and_unmute_actor(pony_actor_t* actor) +{ + size_t i = HASHMAP_BEGIN; + + pony_actor_t* muter; + + while((muter = ponyint_muteset_next(actor->muters, &i)) != NULL) + ponyint_muteset_clearindex(actor->muters, i); + + ponyint_muteset_optimize(actor->muters); + + pony_assert(ponyint_muteset_size(actor->muters) == 0); + + // we no longer need to be muted but we never got unscheduled + actor->muted = 0; + ponyint_unmute_actor(actor); +} + +static void optimistically_globally_unmute_actor(pony_ctx_t* ctx, pony_actor_t* actor) { - if(!has_sync_flag(actor, SYNC_FLAG_OVERLOADED) && !polling) + clear_muteset_and_unmute_actor(actor); + ponyint_sched_start_global_unmute(ctx->scheduler->index, actor); +} + +static void maybe_globally_unmute_actor(pony_ctx_t* ctx, pony_actor_t* actor) +{ + size_t i = HASHMAP_BEGIN; + + pony_actor_t* muter; + bool needs_optimize = false; + + while((muter = ponyint_muteset_next(actor->muters, &i)) != NULL) + { + if(!triggers_muting(muter)) + { + // remove muter if the muter no longer triggers muting + ponyint_muteset_clearindex(actor->muters, i); + needs_optimize = true; + } + } + + if(needs_optimize) + ponyint_muteset_optimize(actor->muters); + + if(ponyint_muteset_size(actor->muters) == 0) + { + // we no longer need to be muted but we never got unscheduled + actor->muted = 0; + ponyint_unmute_actor(actor); + ponyint_sched_start_global_unmute(ctx->scheduler->index, actor); + } +} + +static void maybe_set_overloaded(pony_actor_t* actor, bool polling) +{ + if(!has_sync_flag_any(actor, SYNC_FLAG_OVERLOADED | SYNC_FLAG_MUTED) && + !polling) { // If we hit our batch size, consider this actor to be overloaded - // only if we're not polling from C code. + // only if we're not muted and not polling from C code. // Overloaded actors are allowed to send to other overloaded actors // and to muted actors without being muted themselves. actor_setoverloaded(actor); } - - return true; } bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling) { - pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED)); + pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED_UNSCHEDULED)); ctx->current = actor; size_t batch = PONY_SCHED_BATCH; pony_msg_t* msg; size_t app = 0; + ctx->should_mute_actor = false; + + // check if we still need to be muted + if(actor->muters != NULL && ponyint_muteset_size(actor->muters) > 0) + maybe_globally_unmute_actor(ctx, actor); // If we have been scheduled, the head will not be marked as empty. pony_msg_t* head = atomic_load_explicit(&actor->q.head, memory_order_acquire); @@ -594,14 +692,31 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling) app++; try_gc(ctx, actor); - // maybe mute actor; returns true if mute occurs - if(maybe_should_mute(actor)) - return false; + // if actor should be muted because it sent a message to an actor that + // triggers muting + if(ctx->should_mute_actor) + { + // maybe unschedule muted actor; returns true if should be unscheduled + if(maybe_should_unschedule_actor(ctx, actor)) + return false; + else + // otherwise, stop processing messages in this run but don't unschedule + return true; + } // if we've reached our batch limit // or if we're polling where we want to stop after one app message if(app == batch || polling) - return batch_limit_reached(actor, polling); + { + maybe_set_overloaded(actor, polling); + + // unmute ourselves optimistically as we may never send a message + // to a muted/overload/under pressure actor again + if(actor->muters != NULL && ponyint_muteset_size(actor->muters) > 0) + optimistically_globally_unmute_actor(ctx, actor); + + return true; + } } else { @@ -621,7 +736,12 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling) // We didn't hit our app message batch limit. We now believe our queue to be // empty, but we may have received further messages. pony_assert(app < batch); - pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED)); + pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED_UNSCHEDULED)); + + // unmute ourselves optimistically as we may never send a message + // to a muted/overload/under pressure actor again + if(actor->muters != NULL && ponyint_muteset_size(actor->muters) > 0) + optimistically_globally_unmute_actor(ctx, actor); if(has_sync_flag(actor, SYNC_FLAG_OVERLOADED)) { @@ -630,7 +750,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling) // 1- sending to this actor is no longer grounds for an actor being muted // 2- this actor can no longer send to other actors free from muting should // the receiver be overloaded or muted - actor_unsetoverloaded(actor); + actor_unsetoverloaded(ctx, actor); } try_gc(ctx, actor); @@ -809,6 +929,11 @@ void ponyint_actor_destroy(pony_actor_t* actor) ponyint_messageq_destroy(&actor->q, false); ponyint_gc_destroy(&actor->gc); ponyint_heap_destroy(&actor->heap); + if(actor->muters != NULL) + { + ponyint_muteset_destroy(actor->muters); + POOL_FREE(muteset_t, actor->muters); + } #ifdef USE_RUNTIMESTATS pony_ctx_t* ctx = pony_ctx(); @@ -885,6 +1010,11 @@ bool ponyint_actor_getnoblock() return actor_noblock; } +void ponyint_actor_setmsgstilmute(uint32_t msgs_til_mute) +{ + actor_msgs_til_mute = msgs_til_mute; +} + PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type, bool orphaned) { @@ -1003,8 +1133,10 @@ PONY_API void pony_sendv(pony_ctx_t* ctx, pony_actor_t* to, pony_msg_t* first, #endif )) { - if(!has_sync_flag(to, SYNC_FLAG_MUTED)) + if(!has_sync_flag(to, SYNC_FLAG_MUTED_UNSCHEDULED)) { + // if the receiving actor is currently not unscheduled and + // muted, schedule it. ponyint_sched_add(ctx, to); } } @@ -1046,9 +1178,9 @@ PONY_API void pony_sendv_single(pony_ctx_t* ctx, pony_actor_t* to, #endif )) { - if(!has_sync_flag(to, SYNC_FLAG_MUTED)) + if(!has_sync_flag(to, SYNC_FLAG_MUTED_UNSCHEDULED)) { - // if the receiving actor is currently not unscheduled AND it's not + // if the receiving actor is currently not unscheduled and // muted, schedule it. ponyint_sched_add(ctx, to); } @@ -1186,6 +1318,12 @@ PONY_API void pony_poll(pony_ctx_t* ctx) PONY_API void pony_apply_backpressure() { pony_ctx_t* ctx = pony_ctx(); + + // actors shouldn't be both `muted` and `under pressure` + // unmute but don't globally unmute since we're under pressure + if(ctx->current->muters != NULL && ponyint_muteset_size(ctx->current->muters) > 0) + clear_muteset_and_unmute_actor(ctx->current); + set_sync_flag(ctx->current, SYNC_FLAG_UNDER_PRESSURE); DTRACE1(ACTOR_UNDER_PRESSURE, (uintptr_t)ctx->current); } @@ -1235,7 +1373,11 @@ size_t ponyint_actor_total_mem_size(pony_actor_t* actor) // actor gc total memory used + ponyint_gc_total_mem_size(actor, &actor->gc) // size of stub message when message_q is initialized - + sizeof(pony_msg_t); + + sizeof(pony_msg_t) + // size of muteset of muters + + (actor->muters == NULL ? 0 : + sizeof(muteset_t) + ponyint_muteset_mem_size(actor->muters) + ); } size_t ponyint_actor_total_alloc_size(pony_actor_t* actor) @@ -1254,6 +1396,10 @@ size_t ponyint_actor_total_alloc_size(pony_actor_t* actor) // actor gc total memory allocated + ponyint_gc_total_alloc_size(actor, &actor->gc) // allocation of stub message when message_q is initialized - + POOL_ALLOC_SIZE(pony_msg_t); + + POOL_ALLOC_SIZE(pony_msg_t) + // size of muteset of muters + + (actor->muters == NULL ? 0 : + POOL_ALLOC_SIZE(muteset_t) + ponyint_muteset_alloc_size(actor->muters) + ); } #endif diff --git a/src/libponyrt/actor/actor.h b/src/libponyrt/actor/actor.h index 91b2c662ee..36e4b8c615 100644 --- a/src/libponyrt/actor/actor.h +++ b/src/libponyrt/actor/actor.h @@ -4,6 +4,7 @@ #include "messageq.h" #include "../gc/gc.h" #include "../mem/heap.h" +#include "../sched/mutemap.h" #include "../pony.h" #include #include @@ -60,6 +61,7 @@ typedef struct pony_actor_t // keep things accessed by other actors on a separate cache line alignas(64) heap_t heap; // 52/104 bytes size_t muted; // 4/8 bytes + muteset_t* muters; // 4/8 bytes // internal flags are only ever accessed from a single scheduler thread uint8_t internal_flags; // 4/8 bytes (after alignment) #ifdef USE_RUNTIMESTATS @@ -73,6 +75,7 @@ typedef struct pony_actor_t * 56 bytes: initial header, not including the type descriptor * 52/104 bytes: heap * 4/8 bytes: muted counter + * 4/8 bytes: muters set pointer * 4/8 bytes: internal flags (after alignment) * 64/128 bytes: actorstats (if enabled) * 48/88 bytes: gc @@ -80,15 +83,15 @@ typedef struct pony_actor_t */ #if INTPTR_MAX == INT64_MAX #ifdef USE_RUNTIMESTATS -# define PONY_ACTOR_PAD_SIZE 392 +# define PONY_ACTOR_PAD_SIZE 400 #else -# define PONY_ACTOR_PAD_SIZE 264 +# define PONY_ACTOR_PAD_SIZE 272 #endif #elif INTPTR_MAX == INT32_MAX #ifdef USE_RUNTIMESTATS -# define PONY_ACTOR_PAD_SIZE 232 +# define PONY_ACTOR_PAD_SIZE 236 #else -# define PONY_ACTOR_PAD_SIZE 168 +# define PONY_ACTOR_PAD_SIZE 172 #endif #endif @@ -130,6 +133,8 @@ void ponyint_actor_setnoblock(bool state); bool ponyint_actor_getnoblock(); +void ponyint_actor_setmsgstilmute(uint32_t msgs_til_mute); + PONY_API void pony_apply_backpressure(); PONY_API void pony_release_backpressure(); diff --git a/src/libponyrt/gc/cycle.c b/src/libponyrt/gc/cycle.c index ef8332e012..86884dcbad 100644 --- a/src/libponyrt/gc/cycle.c +++ b/src/libponyrt/gc/cycle.c @@ -871,6 +871,8 @@ static void final(pony_ctx_t* ctx, pony_actor_t* self) if(!ponyint_actor_pendingdestroy(m->actor)) { + pony_assert(m->actor->muters != NULL ? ponyint_muteset_size(m->actor->muters) == 0 : true); + pony_assert(m->actor->muted == 0); ponyint_actor_setpendingdestroy(m->actor); ponyint_actor_final(ctx, m->actor); stack = ponyint_pendingdestroystack_push(stack, m->actor); @@ -889,6 +891,8 @@ static void final(pony_ctx_t* ctx, pony_actor_t* self) { if(!ponyint_actor_pendingdestroy(view->actor)) { + pony_assert(view->actor->muters != NULL ? ponyint_muteset_size(view->actor->muters) == 0 : true); + pony_assert(view->actor->muted == 0); ponyint_actor_setpendingdestroy(view->actor); ponyint_actor_final(ctx, view->actor); stack = ponyint_pendingdestroystack_push(stack, view->actor); @@ -900,6 +904,8 @@ static void final(pony_ctx_t* ctx, pony_actor_t* self) while(stack != NULL) { stack = ponyint_pendingdestroystack_pop(stack, &actor); + pony_assert(actor->muters != NULL ? ponyint_muteset_size(actor->muters) == 0 : true); + pony_assert(actor->muted == 0); ponyint_actor_destroy(actor); } diff --git a/src/libponyrt/options/options.h b/src/libponyrt/options/options.h index 6055a827c6..8cc5e61803 100644 --- a/src/libponyrt/options/options.h +++ b/src/libponyrt/options/options.h @@ -36,6 +36,12 @@ " --ponygcfactor After GC, an actor will next be GC'd at a heap memory\n" \ " usage N times its current value. This is a floating\n" \ " point value. Defaults to 2.0.\n" \ + " --ponymsgstilmute\n" \ + " Number of messages sent to muted/overloaded/under\n" \ + " pressure actors before a muted sending actor gets\n" \ + " unscheduled so it cannot continue to overwhelm\n" \ + " muted/overloaded/under pressure actors.\n" \ + " Defaults to 1.\n" \ " --ponynoyield Do not yield the CPU when no work is available.\n" \ " --ponynoblock Do not send block messages to the cycle detector.\n" \ " --ponypin Pin scheduler threads to CPU cores. The ASIO thread\n" \ diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index cd93eec0ce..f187e8d322 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -72,6 +72,9 @@ typedef struct pony_ctx_t uint64_t last_tsc; schedulerstats_t schedulerstats; #endif + // Used as part of actor runs to keep track of whether the actor should + // be muted or not + bool should_mute_actor; void* serialise_buffer; size_t serialise_size; diff --git a/src/libponyrt/sched/start.c b/src/libponyrt/sched/start.c index 3744b069b2..4b9c607bbb 100644 --- a/src/libponyrt/sched/start.c +++ b/src/libponyrt/sched/start.c @@ -29,6 +29,7 @@ typedef struct options_t uint32_t cd_detect_interval; size_t gc_initial; double gc_factor; + uint32_t msgs_til_mute; bool noyield; bool noblock; bool pin; @@ -64,6 +65,7 @@ enum OPT_CDINTERVAL, OPT_GCINITIAL, OPT_GCFACTOR, + OPT_MSGSTILMUTE, OPT_NOYIELD, OPT_NOBLOCK, OPT_PIN, @@ -85,6 +87,7 @@ static opt_arg_t args[] = {"ponycdinterval", 0, OPT_ARG_REQUIRED, OPT_CDINTERVAL}, {"ponygcinitial", 0, OPT_ARG_REQUIRED, OPT_GCINITIAL}, {"ponygcfactor", 0, OPT_ARG_REQUIRED, OPT_GCFACTOR}, + {"ponymsgstilmute", 0, OPT_ARG_REQUIRED, OPT_MSGSTILMUTE}, {"ponynoyield", 0, OPT_ARG_NONE, OPT_NOYIELD}, {"ponynoblock", 0, OPT_ARG_NONE, OPT_NOBLOCK}, {"ponypin", 0, OPT_ARG_NONE, OPT_PIN}, @@ -177,6 +180,7 @@ static int parse_opts(int argc, char** argv, options_t* opt) case OPT_CDINTERVAL: if(parse_uint(&opt->cd_detect_interval, 0, s.arg_val)) err_out(id, "can't be less than 0"); break; case OPT_GCINITIAL: if(parse_size(&opt->gc_initial, 0, s.arg_val)) err_out(id, "can't be less than 0"); break; case OPT_GCFACTOR: if(parse_udouble(&opt->gc_factor, 1.0, s.arg_val)) err_out(id, "can't be less than 1.0"); break; + case OPT_MSGSTILMUTE: if(parse_uint(&opt->msgs_til_mute, 1, s.arg_val)) err_out(id, "can't be less than 1"); break; case OPT_NOYIELD: opt->noyield = true; break; case OPT_NOBLOCK: opt->noblock = true; break; case OPT_PIN: opt->pin = true; break; @@ -239,6 +243,8 @@ PONY_API int pony_init(int argc, char** argv) opt.gc_factor = 2.0f; opt.pin = false; opt.stats_interval = UINT32_MAX; + opt.msgs_til_mute = 1; + #if defined(USE_SYSTEMATIC_TESTING) opt.systematic_testing_seed = 0; #endif @@ -289,6 +295,7 @@ PONY_API int pony_init(int argc, char** argv) ponyint_heap_setinitialgc(opt.gc_initial); ponyint_heap_setnextgcfactor(opt.gc_factor); ponyint_actor_setnoblock(opt.noblock); + ponyint_actor_setmsgstilmute(opt.msgs_til_mute); pony_exitcode(0); diff --git a/test/libponyc-run/backpressure/expected-exit-code.txt b/test/libponyc-run/backpressure/expected-exit-code.txt new file mode 100644 index 0000000000..48082f72f0 --- /dev/null +++ b/test/libponyc-run/backpressure/expected-exit-code.txt @@ -0,0 +1 @@ +12 diff --git a/test/libponyc-run/backpressure/main.pony b/test/libponyc-run/backpressure/main.pony new file mode 100644 index 0000000000..f2949c7ca4 --- /dev/null +++ b/test/libponyc-run/backpressure/main.pony @@ -0,0 +1,65 @@ +use @printf[I32](fmt: Pointer[U8] tag, ...) +use @pony_get_exitcode[I32]() +use @pony_exitcode[None](code: I32) +use "backpressure" +use "time" + +actor Incrementer + let _underpressure: UnderPressure + var _i: I32 = 0 + var _stop: Bool = false + + new create(env: Env) => + _underpressure = UnderPressure(env, this) + + be keep_counting() => + _i = _i + 1 + @pony_exitcode(_i) + if (_i % 10) == 0 then + _underpressure.do_stuff() + end + if not _stop then + keep_counting() + end + + be stop_counting() => + _stop = true + +actor UnderPressure + let _timers: Timers = Timers + let _timer: Timer tag + let _env: Env + let _inc: Incrementer + var _i: I32 = 0 + + new create(env: Env, inc: Incrementer) => + _env = env + _inc = inc + + Backpressure.apply(ApplyReleaseBackpressureAuth(_env.root)) + _inc.keep_counting() + + let timer = Timer(UnderPressureWaker(this), 500_000_000, 500_000_000) + _timer = timer + _timers(consume timer) + + be release_backpressure() => + Backpressure.release(ApplyReleaseBackpressureAuth(_env.root)) + _inc.stop_counting() + + be do_stuff() => + _i = _i + 1 + + +class UnderPressureWaker is TimerNotify + let _underpressure: UnderPressure + new iso create(underpressure: UnderPressure) => + _underpressure = underpressure + + fun ref apply(timer: Timer, count: U64): Bool => + _underpressure.release_backpressure() + false + +actor Main + new create(env: Env) => + Incrementer(env)