Skip to content

Commit ded4d10

Browse files
committed
Wire runtime scheduler stress tests
1 parent fb43ffa commit ded4d10

3 files changed

Lines changed: 200 additions & 3 deletions

File tree

build.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ pub fn build(b: *std.Build) void {
246246
"src/runtime/tests/test_runtime_api.c",
247247
"src/runtime/tests/test_debug_api.c",
248248
"src/runtime/tests/test_poller.c",
249+
"src/runtime/tests/test_stress.c",
249250
};
250251
inline for (runtime_test_sources) |src| {
251252
runtime_test_exe.root_module.addCSourceFile(.{

src/runtime/tests/test_main.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
#include "test_framework.h"
21
#include "../run_scheduler.h"
2+
#include "test_framework.h"
33

44
/* Test framework globals */
55
int _test_total = 0;
@@ -18,13 +18,16 @@ extern void run_test_numa(void);
1818
extern void run_test_runtime_api(void);
1919
extern void run_test_debug_api(void);
2020
extern void run_test_poller(void);
21+
extern void run_test_stress(void);
2122

2223
int main(void) {
2324
printf("Run Runtime Test Suite\n");
2425
printf("======================\n");
2526

26-
/* Force single-processor mode for deterministic testing. */
27-
setenv("RUN_MAXPROCS", "1", 1);
27+
/* Default to single-P determinism, but allow explicit multi-P stress runs. */
28+
if (getenv("RUN_MAXPROCS") == NULL) {
29+
setenv("RUN_MAXPROCS", "1", 1);
30+
}
2831

2932
/* Initialize the scheduler (required for scheduler and channel tests) */
3033
run_scheduler_init();
@@ -39,6 +42,7 @@ int main(void) {
3942
run_test_scheduler();
4043
run_test_chan();
4144
run_test_poller();
45+
run_test_stress();
4246

4347
TEST_SUMMARY();
4448
}

src/runtime/tests/test_stress.c

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#include "../run_chan.h"
2+
#include "../run_poller.h"
23
#include "../run_scheduler.h"
34
#include "test_framework.h"
45

56
#include <stdatomic.h>
67
#include <stdint.h>
8+
#include <string.h>
9+
#include <unistd.h>
710

811
/* --- Stress Test: Spawn 10,000 Gs --- */
912

@@ -23,6 +26,63 @@ static void test_stress_spawn_10000(void) {
2326
RUN_ASSERT_EQ(atomic_load(&stress_counter), 10000);
2427
}
2528

29+
/* --- Stress Test: Work stealing under asymmetric load --- */
30+
31+
#define STEAL_STRESS_GS 1024
32+
#define STEAL_STRESS_SPINS 256
33+
34+
static _Atomic int steal_total = 0;
35+
static _Atomic int steal_hits[RUN_MAX_P_COUNT];
36+
37+
static bool stress_strict_multip(void) {
38+
const char *env = getenv("RUN_STRESS_MULTIP");
39+
return env != NULL && env[0] == '1';
40+
}
41+
42+
static void reset_steal_hits(void) {
43+
for (uint32_t i = 0; i < RUN_MAX_P_COUNT; i++) {
44+
atomic_store(&steal_hits[i], 0);
45+
}
46+
}
47+
48+
static int count_active_ps(void) {
49+
int active = 0;
50+
for (uint32_t i = 0; i < RUN_MAX_P_COUNT; i++) {
51+
if (atomic_load(&steal_hits[i]) > 0) {
52+
active++;
53+
}
54+
}
55+
return active;
56+
}
57+
58+
static void steal_record_fn(void *arg) {
59+
int spins = (int)(intptr_t)arg;
60+
run_m_t *m = run_current_m();
61+
if (m != NULL && m->current_p != NULL && m->current_p->id < RUN_MAX_P_COUNT) {
62+
atomic_fetch_add(&steal_hits[m->current_p->id], 1);
63+
}
64+
for (int i = 0; i < spins; i++) {
65+
run_yield();
66+
}
67+
atomic_fetch_add(&steal_total, 1);
68+
}
69+
70+
static void test_stress_work_stealing_asymmetric(void) {
71+
atomic_store(&steal_total, 0);
72+
reset_steal_hits();
73+
74+
for (int i = 0; i < STEAL_STRESS_GS; i++) {
75+
run_spawn(steal_record_fn, (void *)(intptr_t)STEAL_STRESS_SPINS);
76+
}
77+
run_scheduler_run();
78+
79+
RUN_ASSERT_EQ(atomic_load(&steal_total), STEAL_STRESS_GS);
80+
81+
if (run_scheduler_get_maxprocs() > 1 && stress_strict_multip()) {
82+
RUN_ASSERT(count_active_ps() >= 2);
83+
}
84+
}
85+
2686
/* --- Stress Test: Producer-Consumer (N=4 producers, M=4 consumers) --- */
2787

2888
static _Atomic int pc_produced = 0;
@@ -69,6 +129,66 @@ static void test_stress_producer_consumer(void) {
69129
run_chan_free(ch);
70130
}
71131

132+
/* --- Stress Test: Tight-loop preemption/progress --- */
133+
134+
static _Atomic int preempt_started = 0;
135+
static _Atomic int preempt_tight_done = 0;
136+
static _Atomic int preempt_observer_done = 0;
137+
static _Atomic int preempt_observed_progress = 0;
138+
static _Atomic uint64_t preempt_sink = 0;
139+
140+
static bool stress_strict_preempt(void) {
141+
const char *env = getenv("RUN_STRESS_PREEMPT");
142+
return env != NULL && env[0] == '1';
143+
}
144+
145+
static void tight_loop_fn(void *arg) {
146+
int loops = (int)(intptr_t)arg;
147+
uint64_t acc = 0;
148+
atomic_fetch_add(&preempt_started, 1);
149+
for (int i = 0; i < loops; i++) {
150+
acc += ((uint64_t)i * 1103515245u) ^ (acc >> 7);
151+
}
152+
atomic_fetch_xor(&preempt_sink, acc);
153+
atomic_fetch_add(&preempt_tight_done, 1);
154+
}
155+
156+
static void preempt_observer_fn(void *arg) {
157+
int expected_tight = (int)(intptr_t)arg;
158+
while (atomic_load(&preempt_tight_done) < expected_tight) {
159+
atomic_fetch_add(&preempt_observed_progress, 1);
160+
run_yield();
161+
if (atomic_load(&preempt_observed_progress) > 1024) {
162+
break;
163+
}
164+
}
165+
atomic_store(&preempt_observer_done, 1);
166+
}
167+
168+
static void test_stress_tight_loop_preemption(void) {
169+
uint32_t maxprocs = run_scheduler_get_maxprocs();
170+
int tight_count = maxprocs > 1 ? (int)maxprocs : 1;
171+
172+
atomic_store(&preempt_started, 0);
173+
atomic_store(&preempt_tight_done, 0);
174+
atomic_store(&preempt_observer_done, 0);
175+
atomic_store(&preempt_observed_progress, 0);
176+
177+
run_spawn(preempt_observer_fn, (void *)(intptr_t)tight_count);
178+
for (int i = 0; i < tight_count; i++) {
179+
run_spawn(tight_loop_fn, (void *)(intptr_t)2000000);
180+
}
181+
run_scheduler_run();
182+
183+
RUN_ASSERT_EQ(atomic_load(&preempt_started), tight_count);
184+
RUN_ASSERT_EQ(atomic_load(&preempt_tight_done), tight_count);
185+
RUN_ASSERT_EQ(atomic_load(&preempt_observer_done), 1);
186+
187+
if (stress_strict_preempt()) {
188+
RUN_ASSERT(atomic_load(&preempt_observed_progress) > 0);
189+
}
190+
}
191+
72192
/* --- Stress Test: Yield Chain (100 Gs x 100 yields each) --- */
73193

74194
static _Atomic int yield_completions = 0;
@@ -162,11 +282,83 @@ static void test_stress_channel_pingpong(void) {
162282
run_chan_free(pong);
163283
}
164284

285+
/* --- Stress Test: Mixed poller I/O and CPU-bound Gs --- */
286+
287+
#define MIXED_CPU_GS 64
288+
#define MIXED_CPU_YIELDS 32
289+
290+
static _Atomic int mixed_cpu_done = 0;
291+
static _Atomic int mixed_io_done = 0;
292+
293+
typedef struct {
294+
run_poll_desc_t *pd;
295+
int read_fd;
296+
} mixed_reader_ctx_t;
297+
298+
static void mixed_cpu_fn(void *arg) {
299+
(void)arg;
300+
uint64_t acc = 0;
301+
for (int i = 0; i < MIXED_CPU_YIELDS; i++) {
302+
acc += (uint64_t)i;
303+
run_yield();
304+
}
305+
atomic_fetch_xor(&preempt_sink, acc);
306+
atomic_fetch_add(&mixed_cpu_done, 1);
307+
}
308+
309+
static void mixed_reader_fn(void *arg) {
310+
mixed_reader_ctx_t *ctx = (mixed_reader_ctx_t *)arg;
311+
run_poll_wait(ctx->pd, RUN_POLL_READ);
312+
313+
char c = 0;
314+
ssize_t n = read(ctx->read_fd, &c, 1);
315+
if (n == 1 && c == 'm') {
316+
atomic_store(&mixed_io_done, 1);
317+
}
318+
}
319+
320+
static void test_stress_mixed_io_cpu(void) {
321+
atomic_store(&mixed_cpu_done, 0);
322+
atomic_store(&mixed_io_done, 0);
323+
324+
int fds[2];
325+
int rc = pipe(fds);
326+
RUN_ASSERT(rc == 0);
327+
328+
run_poll_desc_t pd;
329+
memset(&pd, 0, sizeof(pd));
330+
pd.fd = fds[0];
331+
rc = run_poll_open(&pd);
332+
RUN_ASSERT(rc == 0);
333+
334+
char c = 'm';
335+
ssize_t n = write(fds[1], &c, 1);
336+
RUN_ASSERT_EQ(n, 1);
337+
338+
mixed_reader_ctx_t reader = {.pd = &pd, .read_fd = fds[0]};
339+
for (int i = 0; i < MIXED_CPU_GS; i++) {
340+
run_spawn(mixed_cpu_fn, NULL);
341+
}
342+
run_spawn(mixed_reader_fn, &reader);
343+
344+
run_scheduler_run();
345+
346+
RUN_ASSERT_EQ(atomic_load(&mixed_cpu_done), MIXED_CPU_GS);
347+
RUN_ASSERT_EQ(atomic_load(&mixed_io_done), 1);
348+
349+
run_poll_close(&pd);
350+
close(fds[0]);
351+
close(fds[1]);
352+
}
353+
165354
void run_test_stress(void) {
166355
TEST_SUITE("stress");
167356
RUN_TEST(test_stress_spawn_10000);
168357
RUN_TEST(test_stress_producer_consumer);
358+
RUN_TEST(test_stress_work_stealing_asymmetric);
359+
RUN_TEST(test_stress_tight_loop_preemption);
169360
RUN_TEST(test_stress_yield_chain);
170361
RUN_TEST(test_stress_concurrent_spawn);
171362
RUN_TEST(test_stress_channel_pingpong);
363+
RUN_TEST(test_stress_mixed_io_cpu);
172364
}

0 commit comments

Comments
 (0)