@@ -611,7 +611,7 @@ static int ring_init_main(int argc, char **argv) {
611611 const char * dbg_env = get_string_value ("FORKRUN_DEBUG" );
612612 if (dbg_env && (strcmp (dbg_env , "1" ) == 0 || strcmp (dbg_env , "true" ) == 0 )) {
613613 g_debug = 1 ;
614- fprintf (stderr , "forkrun [DEBUG] Enabled\n" );
614+ fprintf (stderr , "forkrun[DEBUG] Enabled\n" );
615615 } else g_debug = 0 ;
616616
617617 global_num_nodes = 0 ;
@@ -744,8 +744,13 @@ static int ring_init_main(int argc, char **argv) {
744744 if (vals [4 ] > vals [5 ]) vals [4 ] = vals [5 ];
745745
746746 for (uint32_t n = 0 ; n < global_num_nodes ; n ++ ) {
747- state [n ].cfg_w_start = vals [0 ];
748- state [n ].cfg_w_max = vals [1 ];
747+ uint64_t w_start_balanced = vals [0 ] / global_num_nodes ;
748+ if (w_start_balanced < 1 ) w_start_balanced = 1 ;
749+ uint64_t w_max_balanced = vals [1 ] / global_num_nodes ;
750+ if (w_max_balanced < 1 ) w_max_balanced = 1 ;
751+
752+ state [n ].cfg_w_start = w_start_balanced ;
753+ state [n ].cfg_w_max = w_max_balanced ;
749754 state [n ].mode_byte = byte_mode ? 1 : 0 ;
750755 state [n ].numa_enabled = (global_num_nodes > 1 ) ? 1 : 0 ;
751756
@@ -940,6 +945,9 @@ static int ring_numa_ingest_main(int argc, char **argv) {
940945 int mask_words = (max_phys_id / BITS_PER_LONG ) + 1 ;
941946 unsigned long * nodemask = xmalloc (mask_words * sizeof (unsigned long ));
942947
948+ // Persistent pipe for the zero-copy splice fallback
949+ int fallback_pipe [2 ] = {-1 , -1 };
950+
943951 while (1 ) {
944952 uint32_t claimed_node ;
945953 while (read (claim_pipe , & claimed_node , sizeof (claimed_node )) == sizeof (claimed_node )) {
@@ -973,7 +981,46 @@ static int ring_numa_ingest_main(int argc, char **argv) {
973981 chunk_size &= ~4095ULL ;
974982 if (chunk_size == 0 ) chunk_size = 4096 ;
975983
976- ssize_t n = splice (infd , NULL , outfd , NULL , chunk_size , SPLICE_F_MOVE | SPLICE_F_MORE );
984+ ssize_t n = -1 ;
985+ loff_t in_off = lseek (infd , 0 , SEEK_CUR );
986+
987+ // 1. Try copy_file_range / sendfile
988+ if (in_off != (loff_t )- 1 ) {
989+ n = copy_file_range (infd , NULL , outfd , NULL , chunk_size , 0 );
990+ if (n < 0 && (errno == EXDEV || errno == EINVAL || errno == ENOSYS || errno == EOPNOTSUPP )) {
991+ n = sendfile (outfd , infd , NULL , chunk_size );
992+ }
993+ }
994+
995+ // 2. Try direct splice
996+ if (n < 0 ) {
997+ n = splice (infd , NULL , outfd , NULL , chunk_size , SPLICE_F_MOVE | SPLICE_F_MORE );
998+ }
999+
1000+ // 3. Zero-copy intermediate pipe fallback (identically matches flat mode)
1001+ if (n < 0 && errno == EINVAL ) {
1002+ if (fallback_pipe [0 ] == -1 ) {
1003+ if (pipe (fallback_pipe ) == 0 ) fcntl (fallback_pipe [1 ], F_SETPIPE_SZ , 1048576 );
1004+ }
1005+ if (fallback_pipe [0 ] != -1 ) {
1006+ ssize_t s1 = splice (infd , NULL , fallback_pipe [1 ], NULL , chunk_size , SPLICE_F_MOVE |SPLICE_F_MORE );
1007+ if (s1 > 0 ) {
1008+ size_t written = 0 ;
1009+ while (written < (size_t )s1 ) {
1010+ ssize_t s2 = splice (fallback_pipe [0 ], NULL , outfd , NULL , s1 - written , SPLICE_F_MOVE |SPLICE_F_MORE );
1011+ if (s2 <= 0 ) {
1012+ if (s2 < 0 && (errno == EINTR || errno == EAGAIN )) { usleep (10 ); continue ; }
1013+ break ;
1014+ }
1015+ written += s2 ;
1016+ }
1017+ n = written ;
1018+ } else {
1019+ n = s1 ;
1020+ }
1021+ }
1022+ }
1023+
9771024 if (n < 0 ) {
9781025 if (errno == EINTR || errno == EAGAIN ) continue ;
9791026 break ;
@@ -1007,6 +1054,11 @@ static int ring_numa_ingest_main(int argc, char **argv) {
10071054 }
10081055 }
10091056
1057+ if (fallback_pipe [0 ] != -1 ) {
1058+ close (fallback_pipe [0 ]);
1059+ close (fallback_pipe [1 ]);
1060+ }
1061+
10101062 if (numa_enabled ) syscall (__NR_set_mempolicy , MPOL_DEFAULT , NULL , 0 );
10111063 xfree (nodemask );
10121064 xfree (backlogs );
@@ -2026,13 +2078,29 @@ static int ring_claim_main(int argc, char **argv) {
20262078restart_loop :
20272079 while (1 ) {
20282080 struct EscrowPacket ep ;
2081+ // 1. Try local node escrow first
20292082 if (fd_escrow_r && fd_escrow_r [my_numa_node ] >= 0 &&
20302083 read (fd_escrow_r [my_numa_node ], & ep , sizeof (ep )) == sizeof (ep )) {
20312084 my_read_idx = ep .idx ;
20322085 claim_count = ep .cnt ;
20332086 break ;
20342087 }
20352088
2089+ // 2. Try cross-node escrow diffusion (steal from heavily loaded nodes)
2090+ if (local_state -> numa_enabled ) {
2091+ bool stole = false;
2092+ for (uint32_t i = 1 ; i < global_num_nodes ; i ++ ) {
2093+ uint32_t steal_node = (my_numa_node + i ) % global_num_nodes ;
2094+ if (fd_escrow_r [steal_node ] >= 0 && read (fd_escrow_r [steal_node ], & ep , sizeof (ep )) == sizeof (ep )) {
2095+ my_read_idx = ep .idx ;
2096+ claim_count = ep .cnt ;
2097+ stole = true;
2098+ break ;
2099+ }
2100+ }
2101+ if (stole ) break ;
2102+ }
2103+
20362104 uint64_t w_snap = atomic_load_acquire (& local_state -> write_idx );
20372105 uint64_t r_curr = atomic_load_relaxed (& local_state -> read_idx );
20382106
0 commit comments