From e763e3c770bae72570250ffee89af6e1fba93d24 Mon Sep 17 00:00:00 2001 From: Serhii Mariiekha Date: Sat, 25 Apr 2026 23:14:00 +0200 Subject: [PATCH] Complete runtime metrics tracing --- src/runtime/run_scheduler.c | 82 +++++++++++++++++++++++++----- src/runtime/run_scheduler.h | 4 ++ src/runtime/tests/test_scheduler.c | 6 +++ 3 files changed, 79 insertions(+), 13 deletions(-) diff --git a/src/runtime/run_scheduler.c b/src/runtime/run_scheduler.c index 1b7810f..7ef9edc 100644 --- a/src/runtime/run_scheduler.c +++ b/src/runtime/run_scheduler.c @@ -107,6 +107,24 @@ static run_metrics_t scheduler_metrics = {0}; /* Trace output enabled via RUN_TRACE=1 (#410) */ static bool trace_enabled = false; +static int64_t run_metrics_global_queue_len(void) { + return (int64_t)run_global_queue_len(); +} + +static int64_t run_metrics_local_queue_len(void) { + int64_t len = 0; + for (uint32_t i = 0; i < num_ps; i++) { + len += (int64_t)run_local_queue_len(&all_ps[i].local_queue); + } + return len; +} + +static int64_t run_metrics_poll_waiter_count(void) { + if (!scheduler_initialized) + return 0; + return run_poller_has_waiters() ? 1 : 0; +} + /* ======================================================================== * G Queue Operations * ======================================================================== */ @@ -660,7 +678,13 @@ static run_g_t *run_try_steal(run_p_t *self_p) { static void run_park_m(run_m_t *m) { atomic_fetch_add_explicit(&scheduler_metrics.park_count, 1, memory_order_relaxed); if (trace_enabled) { - fprintf(stderr, "{\"event\":\"park\",\"m_id\":%llu}\n", (unsigned long long)m->id); + fprintf(stderr, + "{\"event\":\"park\",\"m_id\":%llu,\"live_g\":%lld," + "\"global_queue\":%lld,\"local_queue\":%lld,\"poll_waiters\":%lld}\n", + (unsigned long long)m->id, + (long long)atomic_load_explicit(&live_g_count, memory_order_relaxed), + (long long)run_metrics_global_queue_len(), (long long)run_metrics_local_queue_len(), + (long long)run_metrics_poll_waiter_count()); } pthread_mutex_lock(&m->park_mutex); m->parked = true; @@ -680,7 +704,12 @@ static void run_park_m(run_m_t *m) { static void run_unpark_m(run_m_t *m) { atomic_fetch_add_explicit(&scheduler_metrics.unpark_count, 1, memory_order_relaxed); if (trace_enabled) { - fprintf(stderr, "{\"event\":\"unpark\",\"m_id\":%llu}\n", (unsigned long long)m->id); + fprintf(stderr, + "{\"event\":\"unpark\",\"m_id\":%llu,\"global_queue\":%lld," + "\"local_queue\":%lld,\"poll_waiters\":%lld}\n", + (unsigned long long)m->id, (long long)run_metrics_global_queue_len(), + (long long)run_metrics_local_queue_len(), + (long long)run_metrics_poll_waiter_count()); } pthread_mutex_lock(&m->park_mutex); m->parked = false; @@ -772,7 +801,12 @@ static run_g_t *run_find_runnable(run_p_t *p) { * This may make Gs runnable by pushing them to run queues. */ atomic_fetch_add_explicit(&scheduler_metrics.poll_count, 1, memory_order_relaxed); if (trace_enabled) { - fprintf(stderr, "{\"event\":\"poll\"}\n"); + fprintf(stderr, + "{\"event\":\"poll\",\"p_id\":%u,\"global_queue\":%lld," + "\"local_queue\":%lld,\"poll_waiters\":%lld}\n", + p->id, (long long)run_metrics_global_queue_len(), + (long long)run_metrics_local_queue_len(), + (long long)run_metrics_poll_waiter_count()); } if (run_poller_poll() > 0) { g = run_local_queue_pop(&p->local_queue); @@ -853,8 +887,11 @@ static void run_schedule_loop(run_m_t *m) { /* Context switch from g0 to g */ atomic_fetch_add_explicit(&scheduler_metrics.context_switches, 1, memory_order_relaxed); if (trace_enabled) { - fprintf(stderr, "{\"event\":\"context_switch\",\"g_id\":%llu}\n", - (unsigned long long)g->id); + fprintf(stderr, + "{\"event\":\"context_switch\",\"g_id\":%llu,\"p_id\":%u," + "\"global_queue\":%lld,\"local_queue\":%lld}\n", + (unsigned long long)g->id, p->id, (long long)run_metrics_global_queue_len(), + (long long)run_metrics_local_queue_len()); } run_context_switch(&m->g0->context, &g->context); @@ -866,8 +903,9 @@ static void run_schedule_loop(run_m_t *m) { if (g->status == G_DEAD) { atomic_fetch_add_explicit(&scheduler_metrics.complete_count, 1, memory_order_relaxed); if (trace_enabled) { - fprintf(stderr, "{\"event\":\"complete\",\"g_id\":%llu}\n", - (unsigned long long)g->id); + fprintf(stderr, "{\"event\":\"complete\",\"g_id\":%llu,\"live_g\":%lld}\n", + (unsigned long long)g->id, + (long long)atomic_load_explicit(&live_g_count, memory_order_relaxed) - 1); } int64_t remaining = atomic_fetch_sub_explicit(&live_g_count, 1, memory_order_release) - 1; @@ -1020,9 +1058,6 @@ void run_scheduler_run(void) { void run_spawn(void (*fn)(void *), void *arg) { atomic_fetch_add_explicit(&scheduler_metrics.spawn_count, 1, memory_order_relaxed); - if (trace_enabled) { - fprintf(stderr, "{\"event\":\"spawn\"}\n"); - } run_g_t *g = run_g_alloc(fn, arg); atomic_fetch_add_explicit(&live_g_count, 1, memory_order_release); @@ -1046,6 +1081,16 @@ void run_spawn(void (*fn)(void *), void *arg) { run_global_queue_push(g); } + if (trace_enabled) { + fprintf(stderr, + "{\"event\":\"spawn\",\"g_id\":%llu,\"live_g\":%lld," + "\"global_queue\":%lld,\"local_queue\":%lld}\n", + (unsigned long long)g->id, + (long long)atomic_load_explicit(&live_g_count, memory_order_relaxed), + (long long)run_metrics_global_queue_len(), + (long long)run_metrics_local_queue_len()); + } + /* If there are idle Ps, wake an M to handle the new work. * Only safe from the main thread or scheduler context. */ if (m && m->current_g == NULL && run_has_idle_p()) { @@ -1155,9 +1200,6 @@ void run_g_ready(run_g_t *g) { void run_spawn_on_node(void (*fn)(void *), void *arg, int32_t node_id) { atomic_fetch_add_explicit(&scheduler_metrics.spawn_count, 1, memory_order_relaxed); - if (trace_enabled) { - fprintf(stderr, "{\"event\":\"spawn_on_node\",\"node_id\":%d}\n", node_id); - } run_g_t *g = run_g_alloc(fn, arg); g->preferred_node = node_id; atomic_fetch_add_explicit(&live_g_count, 1, memory_order_release); @@ -1193,6 +1235,16 @@ void run_spawn_on_node(void (*fn)(void *), void *arg, int32_t node_id) { } wake: + if (trace_enabled) { + fprintf(stderr, + "{\"event\":\"spawn_on_node\",\"g_id\":%llu,\"node_id\":%d," + "\"live_g\":%lld,\"global_queue\":%lld,\"local_queue\":%lld}\n", + (unsigned long long)g->id, node_id, + (long long)atomic_load_explicit(&live_g_count, memory_order_relaxed), + (long long)run_metrics_global_queue_len(), + (long long)run_metrics_local_queue_len()); + } + /* Wake an M if there are idle Ps. */ if (run_has_idle_p()) { run_poller_wakeup(); /* Unblock M in poll_blocking */ @@ -1662,5 +1714,9 @@ run_metrics_t run_runtime_metrics(void) { m.park_count = atomic_load_explicit(&scheduler_metrics.park_count, memory_order_relaxed); m.unpark_count = atomic_load_explicit(&scheduler_metrics.unpark_count, memory_order_relaxed); m.poll_count = atomic_load_explicit(&scheduler_metrics.poll_count, memory_order_relaxed); + m.global_queue_len = run_metrics_global_queue_len(); + m.local_queue_len = run_metrics_local_queue_len(); + m.live_g_count = atomic_load_explicit(&live_g_count, memory_order_relaxed); + m.poll_waiter_count = run_metrics_poll_waiter_count(); return m; } diff --git a/src/runtime/run_scheduler.h b/src/runtime/run_scheduler.h index fbf99f8..7e8a557 100644 --- a/src/runtime/run_scheduler.h +++ b/src/runtime/run_scheduler.h @@ -160,6 +160,10 @@ typedef struct { _Atomic int64_t park_count; _Atomic int64_t unpark_count; _Atomic int64_t poll_count; + _Atomic int64_t global_queue_len; + _Atomic int64_t local_queue_len; + _Atomic int64_t live_g_count; + _Atomic int64_t poll_waiter_count; } run_metrics_t; run_metrics_t run_runtime_metrics(void); diff --git a/src/runtime/tests/test_scheduler.c b/src/runtime/tests/test_scheduler.c index 93f2767..d17f551 100644 --- a/src/runtime/tests/test_scheduler.c +++ b/src/runtime/tests/test_scheduler.c @@ -265,6 +265,12 @@ static void test_runtime_metrics(void) { /* context_switches should have increased (at least one per G) */ int64_t switches = after.context_switches - before.context_switches; RUN_ASSERT((int)switches >= spawn_n); + + /* Snapshot gauges should expose scheduler, queue, and poller visibility. */ + RUN_ASSERT(after.global_queue_len >= 0); + RUN_ASSERT(after.local_queue_len >= 0); + RUN_ASSERT(after.live_g_count == 0); + RUN_ASSERT(after.poll_waiter_count >= 0); } static void test_signal_preemption_tight_loop(void) {