Skip to content

Commit 2f4d8f5

Browse files
jkool702jkool702
authored andcommitted
new fix - per node eventfd
1 parent 06f7dd7 commit 2f4d8f5

1 file changed

Lines changed: 33 additions & 28 deletions

File tree

forkrun_ring.c

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ static int g_debug = 0;
317317
do { \
318318
if ((long)(x) == -1) { \
319319
if (g_debug) \
320-
fprintf(stderr, "forkrun [DEBUG] %s:%d: %s failed: %s\n", __FILE__, \
320+
fprintf(stderr, "forkrun[DEBUG] %s:%d: %s failed: %s\n", __FILE__, \
321321
__LINE__, #x, strerror(errno)); \
322322
} \
323323
} while (0)
@@ -336,7 +336,7 @@ static int g_debug = 0;
336336
X(ring_numa_scanner, ring_numa_scanner_main, \
337337
"ring_numa_scanner <memfd> <node_id> <spawn_fd> <nodes>", \
338338
"Run unified NUMA scanner") \
339-
X(ring_claim, ring_claim_main, "ring_claim[VAR] [FD]", "Claim batch") \
339+
X(ring_claim, ring_claim_main, "ring_claim[VAR][FD]", "Claim batch") \
340340
X(ring_worker, ring_worker_main, "ring_worker [inc|dec][FD]", \
341341
"Worker control") \
342342
X(ring_cleanup_waiter, ring_cleanup_waiter_main, "ring_cleanup_waiter", \
@@ -363,7 +363,7 @@ static int g_debug = 0;
363363
"ring_splice <IN> <OUT> <OFF> <LEN>[close]", "Splice data") \
364364
X(ring_version, ring_version_main, "ring_version[-t|-o|-m|-g|-f|-a]", \
365365
"Show build metadata") \
366-
X(ring_list, ring_list_main, "ring_list [VAR]", "List loadables")
366+
X(ring_list, ring_list_main, "ring_list[VAR]", "List loadables")
367367

368368
#define X(name, func, usage, doc) static int func(int argc, char **argv);
369369
FORKRUN_LOADABLES(X)
@@ -614,6 +614,7 @@ static __thread uint32_t worker_last_major = 0;
614614
static __thread uint32_t worker_last_minor = 0;
615615

616616
static int *evfd_data_arr = NULL;
617+
static int *evfd_indexer_arr = NULL;
617618
static int *fd_escrow_r = NULL;
618619
static int *fd_escrow_w = NULL;
619620

@@ -691,6 +692,7 @@ struct SharedState {
691692
uint8_t ingest_complete;
692693

693694
uint32_t active_waiters ALIGNED(CACHE_LINE);
695+
uint32_t indexer_waiters ALIGNED(CACHE_LINE);
694696
uint64_t min_idx;
695697

696698
uint64_t write_idx ALIGNED(CACHE_LINE);
@@ -938,7 +940,7 @@ static int ring_init_main(int argc, char **argv) {
938940
const char *dbg_env = get_string_value("FORKRUN_DEBUG");
939941
if (dbg_env && (strcmp(dbg_env, "1") == 0 || strcmp(dbg_env, "true") == 0)) {
940942
g_debug = 1;
941-
fprintf(stderr, "forkrun[DEBUG] Enabled\n");
943+
fprintf(stderr, "forkrun [DEBUG] Enabled\n");
942944
} else g_debug = 0;
943945

944946
global_num_nodes = 0;
@@ -1116,11 +1118,13 @@ static int ring_init_main(int argc, char **argv) {
11161118
}
11171119

11181120
evfd_data_arr = xmalloc(global_num_nodes * sizeof(int));
1121+
evfd_indexer_arr = xmalloc(global_num_nodes * sizeof(int));
11191122
fd_escrow_r = xmalloc(global_num_nodes * sizeof(int));
11201123
fd_escrow_w = xmalloc(global_num_nodes * sizeof(int));
11211124

11221125
for (uint32_t n = 0; n < global_num_nodes; n++) {
11231126
evfd_data_arr[n] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
1127+
evfd_indexer_arr[n] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
11241128
int pfd[2];
11251129
if (pipe(pfd) == 0) {
11261130
fcntl(pfd[0], F_SETFL, O_NONBLOCK); fcntl(pfd[1], F_SETFL, O_NONBLOCK);
@@ -1132,7 +1136,7 @@ static int ring_init_main(int argc, char **argv) {
11321136
}
11331137
}
11341138

1135-
// FIX: evfd_ingest_data is a NON-semaphore broadcast. One signal wakes everyone; first to read() clears it.
1139+
// evfd_ingest_data is kept as a standard broadcast eventfd strictly for legacy UMA mode.
11361140
evfd_data = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
11371141
evfd_eof = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
11381142
evfd_ingest_data = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
@@ -1199,10 +1203,12 @@ static int ring_destroy_main(int argc, char **argv) {
11991203
if (evfd_data_arr) {
12001204
for (uint32_t n = 0; n < allocated_num_nodes; n++) {
12011205
if (evfd_data_arr[n] >= 0) close(evfd_data_arr[n]);
1206+
if (evfd_indexer_arr && evfd_indexer_arr[n] >= 0) close(evfd_indexer_arr[n]);
12021207
if (fd_escrow_r && fd_escrow_r[n] >= 0) close(fd_escrow_r[n]);
12031208
if (fd_escrow_w && fd_escrow_w[n] >= 0) close(fd_escrow_w[n]);
12041209
}
12051210
xfree(evfd_data_arr); evfd_data_arr = NULL;
1211+
if (evfd_indexer_arr) { xfree(evfd_indexer_arr); evfd_indexer_arr = NULL; }
12061212
xfree(fd_escrow_r); fd_escrow_r = NULL;
12071213
xfree(fd_escrow_w); fd_escrow_w = NULL;
12081214
}
@@ -1259,12 +1265,10 @@ static int ring_numa_ingest_main(int argc, char **argv) {
12591265
if (bl < min_backlog) { min_backlog = bl; target_node = check; }
12601266
}
12611267

1262-
// Localized Event-Driven Shallow Queue Backpressure (No mathematical deadlock)
1268+
// Localized Event-Driven Shallow Queue Backpressure
12631269
while (1) {
12641270
uint64_t h = atomic_load_relaxed(&state[target_node].chunk_queue_head);
12651271
uint64_t t = atomic_load_acquire(&state[target_node].chunk_queue_tail);
1266-
1267-
// FIX: Cast to int64_t to prevent wrap-around underflow when scanner eagerly increments tail
12681272
if ((int64_t)(h - t) < 4) break;
12691273

12701274
struct pollfd pfd = {.fd = evfd_chunk_done, .events = POLLIN};
@@ -1327,12 +1331,12 @@ static int ring_numa_ingest_main(int argc, char **argv) {
13271331

13281332
__atomic_store_n(&t_state->chunk_queue_head, q_idx + 1, __ATOMIC_RELEASE);
13291333
__atomic_store_n(&g_state->ingest_publish_idx, current_major + 1, __ATOMIC_RELEASE);
1330-
1331-
// Hard barrier to prevent Store-Load reordering (fixes Dekker lost-wakeup on x86/ARM)
13321334
__atomic_thread_fence(__ATOMIC_SEQ_CST);
13331335

1334-
if (atomic_load_relaxed(&g_state->ingest_waiters) > 0) {
1335-
uint64_t v = 1; SYS_CHK(write(evfd_ingest_data, &v, 8));
1336+
// Isolate wakeups to the specific node we just pushed data to
1337+
uint64_t w = atomic_load_relaxed(&t_state->indexer_waiters);
1338+
if (w > 0) {
1339+
SYS_CHK(write(evfd_indexer_arr[target_node], &w, 8));
13361340
}
13371341

13381342
last_target = target_node; current_offset += n; current_major++;
@@ -1372,15 +1376,15 @@ static int ring_indexer_numa_main(int argc, char **argv) {
13721376
}
13731377
if (spin < 100) { cpu_relax(); spin++; continue; }
13741378

1375-
__atomic_fetch_add(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST);
1379+
__atomic_fetch_add(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST);
13761380
if (atomic_load_acquire(&t_state->chunk_queue_head) > my_idx) {
1377-
__atomic_fetch_sub(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST); break;
1381+
__atomic_fetch_sub(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST); break;
13781382
}
1379-
struct pollfd pfds[2] = {{.fd = evfd_ingest_data, .events = POLLIN}, {.fd = evfd_ingest_eof, .events = POLLIN}};
1383+
struct pollfd pfds[2] = {{.fd = evfd_indexer_arr[my_node_id], .events = POLLIN}, {.fd = evfd_ingest_eof, .events = POLLIN}};
13801384
poll(pfds, 2, -1);
1381-
if (pfds[0].revents & POLLIN) { uint64_t v; if (read(evfd_ingest_data, &v, 8)) {}; }
1385+
if (pfds[0].revents & POLLIN) { uint64_t v; if (read(evfd_indexer_arr[my_node_id], &v, 8)) {}; }
13821386
if (pfds[1].revents & POLLIN) { uint64_t v; if (read(evfd_ingest_eof, &v, 8)) {}; }
1383-
__atomic_fetch_sub(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST); spin = 0;
1387+
__atomic_fetch_sub(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST); spin = 0;
13841388
}
13851389
spin = 0;
13861390

@@ -1646,8 +1650,11 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16461650
uint64_t my_head = atomic_load_acquire(&t_state->chunk_queue_head);
16471651

16481652
if (my_tail >= my_head) {
1649-
// FIX: Structurally forbid cross-socket stealing unless node has 2+ chunks.
1650-
int required_bl = 2;
1653+
bool is_eof = (atomic_load_acquire(&g_state->ingest_eof_idx) != ~(uint64_t)0);
1654+
bool workers_starved = (atomic_load_relaxed(&local_state->read_idx) == atomic_load_relaxed(&local_state->write_idx));
1655+
1656+
// Strict activation energy: Need backlog >= 2 to steal, UNLESS we are globally at EOF AND our local workers are completely starved.
1657+
int required_bl = (is_eof && workers_starved) ? 1 : 2;
16511658
int max_bl = 0, best_ready_bl = 0, ready_target = -1, fallback_target = -1;
16521659

16531660
for (int i = 0; i < num_nodes; i++) {
@@ -1671,9 +1678,7 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16711678
}
16721679

16731680
if (max_bl == 0) {
1674-
if (atomic_load_acquire(&g_state->ingest_eof_idx) != ~(uint64_t)0) {
1675-
goto unified_scanner_eof;
1676-
}
1681+
if (is_eof) goto unified_scanner_eof;
16771682
} else {
16781683
if (ready_target != -1 && best_ready_bl >= required_bl) steal_target = ready_target;
16791684
else if (fallback_target != -1 && max_bl >= required_bl) steal_target = fallback_target;
@@ -1695,15 +1700,15 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16951700
experienced_stall = true;
16961701
if (ingest_spin < 100) { cpu_relax(); ingest_spin++; continue; }
16971702

1698-
__atomic_fetch_add(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST);
1703+
__atomic_fetch_add(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST);
16991704
if (atomic_load_acquire(&t_state->chunk_queue_head) > claim_idx) {
1700-
__atomic_fetch_sub(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST); break;
1705+
__atomic_fetch_sub(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST); break;
17011706
}
1702-
struct pollfd pfds[2] = {{.fd = evfd_ingest_data, .events = POLLIN}, {.fd = evfd_ingest_eof, .events = POLLIN}};
1707+
struct pollfd pfds[2] = {{.fd = evfd_indexer_arr[steal_target], .events = POLLIN}, {.fd = evfd_ingest_eof, .events = POLLIN}};
17031708
poll(pfds, 2, -1);
1704-
if (pfds[0].revents & POLLIN) { uint64_t v; if (read(evfd_ingest_data, &v, 8)) {} }
1709+
if (pfds[0].revents & POLLIN) { uint64_t v; if (read(evfd_indexer_arr[steal_target], &v, 8)) {} }
17051710
if (pfds[1].revents & POLLIN) { uint64_t v; if (read(evfd_ingest_eof, &v, 8)) {} }
1706-
__atomic_fetch_sub(&g_state->ingest_waiters, 1, __ATOMIC_SEQ_CST); ingest_spin = 0;
1711+
__atomic_fetch_sub(&t_state->indexer_waiters, 1, __ATOMIC_SEQ_CST); ingest_spin = 0;
17071712
}
17081713

17091714
if (atomic_load_acquire(&t_state->chunk_queue_head) <= claim_idx) continue;
@@ -2171,7 +2176,7 @@ static int ring_claim_main(int argc, char **argv) {
21712176
}
21722177
if (!found && g_debug && global_num_nodes > 1) {
21732178
fprintf(stderr,
2174-
"forkrun[DEBUG] Worker on unmapped physical node %d, "
2179+
"forkrun [DEBUG] Worker on unmapped physical node %d, "
21752180
"defaulting to logical 0\n",
21762181
phys);
21772182
}

0 commit comments

Comments
 (0)