Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions src/runtime/run_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ static run_metrics_t scheduler_metrics = {0};
/* Trace output enabled via RUN_TRACE=1 (#410) */
static bool trace_enabled = false;

static int64_t run_metrics_global_queue_len(void) {
return (int64_t)run_global_queue_len();
}

static int64_t run_metrics_local_queue_len(void) {
int64_t len = 0;
for (uint32_t i = 0; i < num_ps; i++) {
len += (int64_t)run_local_queue_len(&all_ps[i].local_queue);
}
return len;
}

static int64_t run_metrics_poll_waiter_count(void) {
if (!scheduler_initialized)
return 0;
return run_poller_has_waiters() ? 1 : 0;
}

/* ========================================================================
* G Queue Operations
* ======================================================================== */
Expand Down Expand Up @@ -660,7 +678,13 @@ static run_g_t *run_try_steal(run_p_t *self_p) {
static void run_park_m(run_m_t *m) {
atomic_fetch_add_explicit(&scheduler_metrics.park_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"park\",\"m_id\":%llu}\n", (unsigned long long)m->id);
fprintf(stderr,
"{\"event\":\"park\",\"m_id\":%llu,\"live_g\":%lld,"
"\"global_queue\":%lld,\"local_queue\":%lld,\"poll_waiters\":%lld}\n",
(unsigned long long)m->id,
(long long)atomic_load_explicit(&live_g_count, memory_order_relaxed),
(long long)run_metrics_global_queue_len(), (long long)run_metrics_local_queue_len(),
(long long)run_metrics_poll_waiter_count());
}
pthread_mutex_lock(&m->park_mutex);
m->parked = true;
Expand All @@ -680,7 +704,12 @@ static void run_park_m(run_m_t *m) {
static void run_unpark_m(run_m_t *m) {
atomic_fetch_add_explicit(&scheduler_metrics.unpark_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"unpark\",\"m_id\":%llu}\n", (unsigned long long)m->id);
fprintf(stderr,
"{\"event\":\"unpark\",\"m_id\":%llu,\"global_queue\":%lld,"
"\"local_queue\":%lld,\"poll_waiters\":%lld}\n",
(unsigned long long)m->id, (long long)run_metrics_global_queue_len(),
(long long)run_metrics_local_queue_len(),
(long long)run_metrics_poll_waiter_count());
}
pthread_mutex_lock(&m->park_mutex);
m->parked = false;
Expand Down Expand Up @@ -772,7 +801,12 @@ static run_g_t *run_find_runnable(run_p_t *p) {
* This may make Gs runnable by pushing them to run queues. */
atomic_fetch_add_explicit(&scheduler_metrics.poll_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"poll\"}\n");
fprintf(stderr,
"{\"event\":\"poll\",\"p_id\":%u,\"global_queue\":%lld,"
"\"local_queue\":%lld,\"poll_waiters\":%lld}\n",
p->id, (long long)run_metrics_global_queue_len(),
(long long)run_metrics_local_queue_len(),
(long long)run_metrics_poll_waiter_count());
}
if (run_poller_poll() > 0) {
g = run_local_queue_pop(&p->local_queue);
Expand Down Expand Up @@ -853,8 +887,11 @@ static void run_schedule_loop(run_m_t *m) {
/* Context switch from g0 to g */
atomic_fetch_add_explicit(&scheduler_metrics.context_switches, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"context_switch\",\"g_id\":%llu}\n",
(unsigned long long)g->id);
fprintf(stderr,
"{\"event\":\"context_switch\",\"g_id\":%llu,\"p_id\":%u,"
"\"global_queue\":%lld,\"local_queue\":%lld}\n",
(unsigned long long)g->id, p->id, (long long)run_metrics_global_queue_len(),
(long long)run_metrics_local_queue_len());
}
run_context_switch(&m->g0->context, &g->context);

Expand All @@ -866,8 +903,9 @@ static void run_schedule_loop(run_m_t *m) {
if (g->status == G_DEAD) {
atomic_fetch_add_explicit(&scheduler_metrics.complete_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"complete\",\"g_id\":%llu}\n",
(unsigned long long)g->id);
fprintf(stderr, "{\"event\":\"complete\",\"g_id\":%llu,\"live_g\":%lld}\n",
(unsigned long long)g->id,
(long long)atomic_load_explicit(&live_g_count, memory_order_relaxed) - 1);
}
int64_t remaining =
atomic_fetch_sub_explicit(&live_g_count, 1, memory_order_release) - 1;
Expand Down Expand Up @@ -1020,9 +1058,6 @@ void run_scheduler_run(void) {

void run_spawn(void (*fn)(void *), void *arg) {
atomic_fetch_add_explicit(&scheduler_metrics.spawn_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"spawn\"}\n");
}
run_g_t *g = run_g_alloc(fn, arg);
atomic_fetch_add_explicit(&live_g_count, 1, memory_order_release);

Expand All @@ -1046,6 +1081,16 @@ void run_spawn(void (*fn)(void *), void *arg) {
run_global_queue_push(g);
}

if (trace_enabled) {
fprintf(stderr,
"{\"event\":\"spawn\",\"g_id\":%llu,\"live_g\":%lld,"
"\"global_queue\":%lld,\"local_queue\":%lld}\n",
(unsigned long long)g->id,
(long long)atomic_load_explicit(&live_g_count, memory_order_relaxed),
(long long)run_metrics_global_queue_len(),
(long long)run_metrics_local_queue_len());
}

/* If there are idle Ps, wake an M to handle the new work.
* Only safe from the main thread or scheduler context. */
if (m && m->current_g == NULL && run_has_idle_p()) {
Expand Down Expand Up @@ -1155,9 +1200,6 @@ void run_g_ready(run_g_t *g) {

void run_spawn_on_node(void (*fn)(void *), void *arg, int32_t node_id) {
atomic_fetch_add_explicit(&scheduler_metrics.spawn_count, 1, memory_order_relaxed);
if (trace_enabled) {
fprintf(stderr, "{\"event\":\"spawn_on_node\",\"node_id\":%d}\n", node_id);
}
run_g_t *g = run_g_alloc(fn, arg);
g->preferred_node = node_id;
atomic_fetch_add_explicit(&live_g_count, 1, memory_order_release);
Expand Down Expand Up @@ -1193,6 +1235,16 @@ void run_spawn_on_node(void (*fn)(void *), void *arg, int32_t node_id) {
}

wake:
if (trace_enabled) {
fprintf(stderr,
"{\"event\":\"spawn_on_node\",\"g_id\":%llu,\"node_id\":%d,"
"\"live_g\":%lld,\"global_queue\":%lld,\"local_queue\":%lld}\n",
(unsigned long long)g->id, node_id,
(long long)atomic_load_explicit(&live_g_count, memory_order_relaxed),
(long long)run_metrics_global_queue_len(),
(long long)run_metrics_local_queue_len());
}

/* Wake an M if there are idle Ps. */
if (run_has_idle_p()) {
run_poller_wakeup(); /* Unblock M in poll_blocking */
Expand Down Expand Up @@ -1662,5 +1714,9 @@ run_metrics_t run_runtime_metrics(void) {
m.park_count = atomic_load_explicit(&scheduler_metrics.park_count, memory_order_relaxed);
m.unpark_count = atomic_load_explicit(&scheduler_metrics.unpark_count, memory_order_relaxed);
m.poll_count = atomic_load_explicit(&scheduler_metrics.poll_count, memory_order_relaxed);
m.global_queue_len = run_metrics_global_queue_len();
m.local_queue_len = run_metrics_local_queue_len();
m.live_g_count = atomic_load_explicit(&live_g_count, memory_order_relaxed);
m.poll_waiter_count = run_metrics_poll_waiter_count();
return m;
}
4 changes: 4 additions & 0 deletions src/runtime/run_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ typedef struct {
_Atomic int64_t park_count;
_Atomic int64_t unpark_count;
_Atomic int64_t poll_count;
_Atomic int64_t global_queue_len;
_Atomic int64_t local_queue_len;
_Atomic int64_t live_g_count;
_Atomic int64_t poll_waiter_count;
} run_metrics_t;
run_metrics_t run_runtime_metrics(void);

Expand Down
6 changes: 6 additions & 0 deletions src/runtime/tests/test_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ static void test_runtime_metrics(void) {
/* context_switches should have increased (at least one per G) */
int64_t switches = after.context_switches - before.context_switches;
RUN_ASSERT((int)switches >= spawn_n);

/* Snapshot gauges should expose scheduler, queue, and poller visibility. */
RUN_ASSERT(after.global_queue_len >= 0);
RUN_ASSERT(after.local_queue_len >= 0);
RUN_ASSERT(after.live_g_count == 0);
RUN_ASSERT(after.poll_waiter_count >= 0);
}

static void test_signal_preemption_tight_loop(void) {
Expand Down
Loading