@@ -1034,9 +1034,11 @@ static int ring_indexer_numa_main(int argc, char **argv) {
10341034 char tail_buf [65536 ];
10351035 uint64_t actual_start = 0 ;
10361036 uint32_t last_major_seen = 0 ;
1037+ uint32_t last_node_id = 0 ;
10371038
10381039 while (read (index_pipe , & pkt , sizeof (pkt )) == sizeof (pkt )) {
10391040 if (pkt .major_id == UINT32_MAX ) break ;
1041+ last_node_id = pkt .node_id ;
10401042
10411043 uint64_t chunk_end = pkt .offset + pkt .length ;
10421044 uint64_t actual_end = chunk_end ;
@@ -1085,13 +1087,17 @@ static int ring_indexer_numa_main(int argc, char **argv) {
10851087 }
10861088
10871089 if (pkt .major_id == UINT32_MAX && actual_start < pkt .offset ) {
1090+ // Guard against UINT32_MAX wrap-around sentinel confusion
1091+ uint32_t final_major = (last_major_seen < UINT32_MAX - 1 ) ? last_major_seen + 1 : last_major_seen ;
10881092 struct ScannerTask final_task = {
1089- .major_id = last_major_seen + 1 ,
1093+ .major_id = final_major ,
10901094 .pad = 0 ,
10911095 .start_off = actual_start ,
10921096 .length = pkt .offset - actual_start
10931097 };
1094- write (node_pipes [0 ], & final_task , sizeof (final_task ));
1098+ // Route exactly to the last node instead of defaulting to 0
1099+ int target = node_pipes [last_node_id % num_node_pipes ];
1100+ write (target , & final_task , sizeof (final_task ));
10951101 }
10961102
10971103 xfree (node_pipes );
@@ -2173,7 +2179,9 @@ static int ring_ack_main(int argc, char **argv) {
21732179
21742180 struct SharedState * local_state = (my_numa_node != -1 && my_numa_node < (int )global_num_nodes ) ? & state [my_numa_node ] : & state [0 ];
21752181
2182+ uint64_t my_idx ;
21762183 if (worker_last_cnt > 0 ) {
2184+ my_idx = worker_last_idx ;
21772185 if (local_state && local_state -> numa_enabled ) {
21782186 op .major_idx = worker_last_major ;
21792187 op .minor_idx = worker_last_minor ;
@@ -2188,15 +2196,26 @@ static int ring_ack_main(int argc, char **argv) {
21882196 if (local_state && local_state -> numa_enabled ) {
21892197 op .major_idx = (uint32_t )atoi (get_string_value ("RING_MAJOR" ));
21902198 op .minor_idx = (uint32_t )atoi (get_string_value ("RING_MINOR" ));
2199+ my_idx = (uint64_t )atoll (get_string_value ("RING_BATCH_IDX" ));
21912200 } else {
21922201 op .major_idx = (uint32_t )atoi (get_string_value ("RING_BATCH_IDX" ));
21932202 op .minor_idx = 0 ;
2203+ my_idx = op .major_idx ;
21942204 }
21952205 }
21962206
21972207 if (fd_fallow > 0 ) {
2198- struct IndexPacket ip = { .idx = op .major_idx , .cnt = op .cnt };
2199- SYS_CHK (write (fd_fallow , & ip , sizeof (ip )));
2208+ if (local_state && local_state -> numa_enabled ) {
2209+ // NUMA MODE: Push exact physical byte coordinates to fallow_phys
2210+ uint64_t start = local_state -> offset_ring [my_idx & RING_MASK ] & ~FLAG_PARTIAL_BATCH ;
2211+ uint64_t end = local_state -> end_ring [(my_idx + op .cnt - 1 ) & RING_MASK ];
2212+ struct PhysPacket pp = { .off = start , .len = end - start };
2213+ SYS_CHK (write (fd_fallow , & pp , sizeof (pp )));
2214+ } else {
2215+ // FLAT MODE: Legacy behavior
2216+ struct IndexPacket ip = { .idx = op .major_idx , .cnt = op .cnt };
2217+ SYS_CHK (write (fd_fallow , & ip , sizeof (ip )));
2218+ }
22002219 }
22012220
22022221 if (fd_target > 0 ) {
0 commit comments