@@ -1132,11 +1132,11 @@ static int ring_init_main(int argc, char **argv) {
11321132 }
11331133 }
11341134
1135- // BUGFIX: eventfd made into a semaphore so read() strictly consumes 1 ticket instead of wiping the counter .
1135+ // FIX: evfd_ingest_data is a NON- semaphore broadcast. One signal wakes everyone; first to read() clears it .
11361136 evfd_data = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK );
1137- evfd_eof = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE );
1138- evfd_ingest_data = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE );
1139- evfd_ingest_eof = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE );
1137+ evfd_eof = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK );
1138+ evfd_ingest_data = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK );
1139+ evfd_ingest_eof = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK );
11401140 evfd_starve = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK );
11411141 evfd_chunk_done = eventfd (0 , EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE );
11421142
@@ -1263,7 +1263,9 @@ static int ring_numa_ingest_main(int argc, char **argv) {
12631263 while (1 ) {
12641264 uint64_t h = atomic_load_relaxed (& state [target_node ].chunk_queue_head );
12651265 uint64_t t = atomic_load_acquire (& state [target_node ].chunk_queue_tail );
1266- if (h - t < 4 ) break ; // Strict per-node depth limit ensures no runaways and no deadlocks
1266+
1267+ // FIX: Cast to int64_t to prevent wrap-around underflow when scanner eagerly increments tail
1268+ if ((int64_t )(h - t ) < 4 ) break ;
12671269
12681270 struct pollfd pfd = {.fd = evfd_chunk_done , .events = POLLIN };
12691271 if (poll (& pfd , 1 , 100 ) > 0 ) {
@@ -1329,10 +1331,8 @@ static int ring_numa_ingest_main(int argc, char **argv) {
13291331 // Hard barrier to prevent Store-Load reordering (fixes Dekker lost-wakeup on x86/ARM)
13301332 __atomic_thread_fence (__ATOMIC_SEQ_CST );
13311333
1332- // BUGFIX: Write `w` tickets to EFD_SEMAPHORE instead of `1` ticket to non-semaphore.
1333- uint64_t w = atomic_load_relaxed (& g_state -> ingest_waiters );
1334- if (w > 0 ) {
1335- SYS_CHK (write (evfd_ingest_data , & w , 8 ));
1334+ if (atomic_load_relaxed (& g_state -> ingest_waiters ) > 0 ) {
1335+ uint64_t v = 1 ; SYS_CHK (write (evfd_ingest_data , & v , 8 ));
13361336 }
13371337
13381338 last_target = target_node ; current_offset += n ; current_major ++ ;
@@ -1646,8 +1646,8 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16461646 uint64_t my_head = atomic_load_acquire (& t_state -> chunk_queue_head );
16471647
16481648 if (my_tail >= my_head ) {
1649- bool is_eof = ( atomic_load_acquire ( & g_state -> ingest_eof_idx ) != ~( uint64_t ) 0 );
1650- int required_bl = 2 ; // Activation energy for cross-socket steal
1649+ // FIX: Structurally forbid cross-socket stealing unless node has 2+ chunks.
1650+ int required_bl = 2 ;
16511651 int max_bl = 0 , best_ready_bl = 0 , ready_target = -1 , fallback_target = -1 ;
16521652
16531653 for (int i = 0 ; i < num_nodes ; i ++ ) {
@@ -1671,7 +1671,9 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
16711671 }
16721672
16731673 if (max_bl == 0 ) {
1674- if (is_eof ) goto unified_scanner_eof ;
1674+ if (atomic_load_acquire (& g_state -> ingest_eof_idx ) != ~(uint64_t )0 ) {
1675+ goto unified_scanner_eof ;
1676+ }
16751677 } else {
16761678 if (ready_target != -1 && best_ready_bl >= required_bl ) steal_target = ready_target ;
16771679 else if (fallback_target != -1 && max_bl >= required_bl ) steal_target = fallback_target ;
0 commit comments