Skip to content

Commit aa8ea0a

Browse files
authored
Update forkrun_ring.c
bugfix
1 parent 6370de1 commit aa8ea0a

1 file changed

Lines changed: 22 additions & 13 deletions

File tree

forkrun_ring.c

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// forkrun_ring.c v13.0.2-UNIFIED (Bugfix: UMA Premature EOF)
1+
// forkrun_ring.c v13.0.3-UNIFIED (Bugfix: UMA EOF Loop & NUMA Steal Deadlock)
22
// ======================================================================================
33
// ARCHITECTURE OVERVIEW:
44
//
@@ -204,7 +204,7 @@ static inline char *try_simd_scan(char *p, char *safe_end, uint64_t target, char
204204
#define DAMPING_OFFSET 6
205205

206206
#ifndef FORKRUN_RING_VERSION
207-
#define FORKRUN_RING_VERSION "NUMA-v13.0.2-UNIFIED"
207+
#define FORKRUN_RING_VERSION "NUMA-v13.0.3-UNIFIED"
208208
#endif
209209

210210
#define atomic_load_acquire(ptr) __atomic_load_n(ptr, __ATOMIC_ACQUIRE)
@@ -277,7 +277,7 @@ static int g_debug = 0;
277277
"ring_numa_scanner <memfd> <node_id> <spawn_fd> <nodes>", \
278278
"Run unified NUMA scanner") \
279279
X(ring_claim, ring_claim_main, "ring_claim[VAR] [FD]", "Claim batch") \
280-
X(ring_worker, ring_worker_main, "ring_worker[inc|dec][FD]", \
280+
X(ring_worker, ring_worker_main, "ring_worker [inc|dec][FD]", \
281281
"Worker control") \
282282
X(ring_cleanup_waiter, ring_cleanup_waiter_main, "ring_cleanup_waiter", \
283283
"Cleanup waiter") \
@@ -1334,15 +1334,14 @@ static int ring_indexer_numa_main(int argc, char **argv) {
13341334
do { \
13351335
while (1) { \
13361336
uint64_t limit; \
1337-
if (!is_numa && atomic_load_relaxed(&local_state->fallow_active)) \
1337+
if (!is_numa && atomic_load_relaxed(&local_state->fallow_active)) { \
13381338
limit = atomic_load_acquire(&local_state->min_idx); \
1339-
else \
1339+
} else { \
13401340
limit = atomic_load_acquire(&local_state->read_idx); \
1341-
\
1341+
} \
13421342
bool limit_lines = ((local_scan_idx - limit) >= RING_SIZE); \
13431343
bool limit_chunks = is_numa && (cb_head >= 3) && (limit < chunk_bounds[(cb_head - 3) & 3]); \
13441344
if (!limit_lines && !limit_chunks) break; \
1345-
\
13461345
UNIFIED_ADAPTIVE_COMMIT(true); \
13471346
usleep(100); \
13481347
} \
@@ -1446,7 +1445,6 @@ static int ring_indexer_numa_main(int argc, char **argv) {
14461445
} \
14471446
} while (0)
14481447

1449-
14501448
// The Core Unified Scanner Function
14511449
static inline __attribute__((always_inline)) int
14521450
core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes, const bool is_numa)
@@ -1544,10 +1542,14 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
15441542
if (bl > max_bl) { max_bl = bl; fallback_target = i; }
15451543
uint32_t c_maj = state[i].chunk_queue[t & META_RING_MASK];
15461544
bool ready = true;
1547-
if (c_maj > 0) {
1545+
1546+
if (atomic_load_acquire(&state[i].write_idx) == 0) {
1547+
ready = false; // Never steal from a node that hasn't published work yet
1548+
} else if (c_maj > 0) {
15481549
struct ChunkMeta *pm = &g_state->meta_ring[(c_maj - 1) & META_RING_MASK];
15491550
if (!(atomic_load_acquire(&pm->actual_end) & FLAG_META_READY)) ready = false;
15501551
}
1552+
15511553
if (ready && bl > best_ready_bl) { best_ready_bl = bl; ready_target = i; }
15521554
}
15531555
}
@@ -1668,7 +1670,10 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16681670
}
16691671
}
16701672
if (atomic_load_acquire(&local_state->ingest_complete)) {
1671-
if (n == 0) status = 1; // Only set EOF if ingest is truly done
1673+
// If Ingest is completely finished, and we did not read strictly MORE
1674+
// bytes than we already had (which is why we are in this 'else' block),
1675+
// it means the file will never grow and we have reached physical EOF.
1676+
status = 1;
16721677
} else {
16731678
status = 0; // Prevent premature EOF
16741679
bool starving = (atomic_load_relaxed(&local_state->active_waiters) > 0);
@@ -2370,7 +2375,6 @@ static int ring_ack_main(int argc, char **argv) {
23702375
}
23712376

23722377
// --- MIN-HEAP ORDERING ---
2373-
#define HEAP_MAX 262144
23742378
struct HeapNode {
23752379
uint64_t key;
23762380
struct OrderPacket pkt;
@@ -2379,7 +2383,7 @@ struct HeapNode {
23792383
static void heap_push(struct HeapNode **heap_ptr, int *sz, int *cap, uint64_t key, struct OrderPacket pkt) {
23802384
if (*sz >= *cap) {
23812385
*cap = (*cap) * 2;
2382-
*heap_ptr = xrealloc(*heap_ptr, (size_t)(*cap) * sizeof(struct HeapNode));
2386+
*heap_ptr = xrealloc(*heap_ptr, (*cap) * sizeof(struct HeapNode));
23832387
}
23842388
struct HeapNode *heap = *heap_ptr;
23852389
int i = (*sz)++;
@@ -2442,7 +2446,11 @@ static int ring_order_main(int argc, char **argv) {
24422446

24432447
bool use_zerocopy = false; struct stat st_out;
24442448
if (fstat(1, &st_out) == 0 && S_ISREG(st_out.st_mode)) use_zerocopy = true;
2445-
int heap_cap = HEAP_MAX; struct HeapNode *heap = xmalloc((size_t)heap_cap * sizeof(struct HeapNode)); int heap_sz = 0;
2449+
2450+
int heap_cap = 262144;
2451+
struct HeapNode *heap = xmalloc(heap_cap * sizeof(struct HeapNode));
2452+
int heap_sz = 0;
2453+
24462454
uint32_t expected_major = 0; uint32_t expected_minor = 0; struct OrderPacket ops[64]; ssize_t n_read;
24472455

24482456
while ((n_read = read(fd_in, ops, sizeof(ops))) > 0) {
@@ -2695,6 +2703,7 @@ static int ring_fallow_phys_main(int argc, char **argv) {
26952703
}
26962704
}
26972705
}
2706+
while (head) { struct Interval *tmp = head; head = head->next; free(tmp); }
26982707
return EXECUTION_SUCCESS;
26992708
}
27002709

0 commit comments

Comments
 (0)