1- // forkrun_ring.c v9.8 .0-NUMA (Golden Master - NUMA Edition)
1+ // forkrun_ring.c v10.0 .0-NUMA (Golden Master - NUMA Edition)
22// ======================================================================================
33// ARCHITECTURE OVERVIEW:
44//
55// 1. Zero-Copy Ingest: Data moved from stdin to memfd via splice/copy_file_range.
6- // 2. The Ring: Shared memory ring storing offsets.
6+ // 2. The Ring: Shared memory ring storing offsets.
77// 3. NUMA "Born-Local" Topology:
88// - Data is forced to allocate on specific physical CPU sockets via set_mempolicy.
99// - The state is partitioned into an array of isolated SharedState rings (one per node).
3737#include <sched.h>
3838#include <sys/sendfile.h>
3939#include <sys/sysinfo.h>
40- #include <time.h>
40+ #include <time.h>
4141#include <sys/syscall.h>
4242#include <ctype.h>
4343#include <sys/socket.h>
7575#elif defined(__s390x__ )
7676 #define __NR_set_mempolicy 276
7777#else
78- #define __NR_set_mempolicy 0
78+ #define __NR_set_mempolicy 0
7979#endif
8080#endif
8181
@@ -185,29 +185,29 @@ static int g_debug = 0;
185185#define FORKRUN_LOADABLES (X ) \
186186 X(ring_init, ring_init_main, "ring_init [FLAGS]", "Initialize ring with config") \
187187 X(ring_destroy, ring_destroy_main, "ring_destroy", "Destroy ring") \
188- X(ring_scanner, ring_scanner_main, "ring_scanner <fd> [spawn_fd]", "Run legacy scanner") \
188+ X(ring_scanner, ring_scanner_main, "ring_scanner <fd>[spawn_fd]", "Run legacy scanner") \
189189 X(ring_numa_ingest, ring_numa_ingest_main, "ring_numa_ingest <infd> <outfd> <idx_pipe> <claim_pipe> <nodes> [ordered]", "Run NUMA topological ingest") \
190190 X(ring_indexer_numa, ring_indexer_numa_main, "ring_indexer_numa <memfd> <idx_pipe> <node_pipes...>", "Run NUMA chunk indexer") \
191191 X(ring_numa_scanner, ring_numa_scanner_main, "ring_numa_scanner <memfd> <node_id> <claim_pipe> <spawn_fd> <nodes> <node_pipes...>", "Run NUMA localized scanner") \
192- X(ring_claim, ring_claim_main, "ring_claim [VAR] [FD]", "Claim batch") \
192+ X(ring_claim, ring_claim_main, "ring_claim[VAR] [FD]", "Claim batch") \
193193 X(ring_worker, ring_worker_main, "ring_worker [inc|dec] [FD]", "Worker control") \
194194 X(ring_cleanup_waiter, ring_cleanup_waiter_main, "ring_cleanup_waiter", "Cleanup waiter") \
195195 X(ring_ingest, ring_ingest_main, "ring_ingest", "Signal ingest") \
196- X(ring_fallow, ring_fallow_main, "ring_fallow <PIPE> <FILE> [dry]","Logical fallow") \
196+ X(ring_fallow, ring_fallow_main, "ring_fallow <PIPE> <FILE>[dry]","Logical fallow") \
197197 X(ring_ack, ring_ack_main, "ring_ack <FD> <FD_OUT>", "Ack batch") \
198- X(ring_order, ring_order_main, "ring_order <FD> <PFX|memfd> [unordered]", "Reorder output") \
198+ X(ring_order, ring_order_main, "ring_order <FD> <PFX|memfd>[unordered]", "Reorder output") \
199199 X(ring_copy, ring_copy_main, "ring_copy <OUT> <IN>", "Zero-copy ingest") \
200200 X(ring_signal, ring_signal_main, "ring_signal <FD>", "Signal eventfd") \
201- X(lseek, lseek_main, "lseek <FD> <OFF> [WHENCE] [VAR]", "Seek fd") \
201+ X(lseek, lseek_main, "lseek <FD> <OFF>[WHENCE] [VAR]", "Seek fd") \
202202 X(ring_indexer, ring_indexer_main, "ring_indexer", "NUMA Indexer") \
203203 X(ring_fetcher, ring_fetcher_main, "ring_fetcher", "NUMA Fetcher") \
204204 X(ring_fallow_phys, ring_fallow_phys_main, "ring_fallow_phys", "Physical fallow") \
205205 X(ring_memfd_create, ring_memfd_create_main, "ring_memfd_create <VAR>", "Create memfd") \
206206 X(ring_seal, ring_seal_main, "ring_seal <FD>", "Seal memfd") \
207207 X(ring_fcntl, ring_fcntl_main, "ring_fcntl <FD> <cmd>", "File control") \
208208 X(ring_pipe, ring_pipe_main, "ring_pipe <ARR|RD> [WR]", "Create pipe") \
209- X(ring_splice, ring_splice_main, "ring_splice <IN> <OUT> <OFF> <LEN> [close]", "Splice data") \
210- X(ring_version, ring_version_main, "ring_version [-t|-o|-m|-g|-f|-a]", "Show build metadata") \
209+ X(ring_splice, ring_splice_main, "ring_splice <IN> <OUT> <OFF> <LEN>[close]", "Splice data") \
210+ X(ring_version, ring_version_main, "ring_version[-t|-o|-m|-g|-f|-a]", "Show build metadata") \
211211 X(ring_list, ring_list_main, "ring_list [VAR]", "List loadables")
212212
213213#define X (name , func , usage , doc ) static int func(int argc, char **argv);
@@ -224,8 +224,6 @@ static inline int auto_detect_numa_node() {
224224 return 0 ;
225225}
226226
227- // We return max_node + 1 (instead of counting online nodes) to safely handle
228- // sparse topologies without breaking the 1:1 physical sysfs paths.
229227static uint32_t get_highest_numa_node_id () {
230228 int fd = open ("/sys/devices/system/node/online" , O_RDONLY );
231229 if (fd < 0 ) return 1 ;
@@ -315,7 +313,7 @@ static SHELL_VAR *bind_var_or_array(const char *name, char *value, int flags) {
315313 char * endp = NULL ; errno = 0 ; long n = strtol (idx_s , & endp , 10 );
316314 if (endp == idx_s || * endp != '\0' || errno == ERANGE ) { }
317315 else ret = bind_array_variable (base_s , (arrayind_t )n , val_s , flags );
318- }
316+ }
319317 xfree (base_s ); xfree (idx_s ); xfree (val_s );
320318 return ret ;
321319}
@@ -553,7 +551,7 @@ static inline void cleanup_waiter_state() {
553551static uint64_t get_v_def (const char * type , bool stdin_mode ) {
554552 if (!strcmp (type , "workers" )) return sysconf (_SC_NPROCESSORS_ONLN );
555553 if (!strcmp (type , "lines" )) return 4096 ;
556- if (!strcmp (type , "bytes" )) {
554+ if (!strcmp (type , "bytes" )) {
557555 uint64_t l2 = get_cache_bytes ();
558556 if (stdin_mode ) return (l2 < (1ULL <<19 )) ? l2 : (1ULL <<19 );
559557 uint64_t arg = get_arg_max_bytes ();
@@ -584,7 +582,7 @@ static void apply_config(char type, char sub, const char *arg) {
584582 uint64_t u_val = 0 ;
585583
586584 if (strcmp (arg , "x" ) == 0 ) {
587- if (type == 1 ) { clear_mask |= M_L_ALL ; set_mask |= M_BMODE ; }
585+ if (type == 1 ) { clear_mask |= M_L_ALL ; set_mask |= M_BMODE ; }
588586 else if (type == 2 ) { clear_mask |= (M_B_ALL | M_BMODE ); }
589587 } else {
590588 if (arg [0 ] == '\0' ) val_code = S_DEF ;
@@ -866,7 +864,7 @@ static int ring_init_main(int argc, char **argv) {
866864
867865static int ring_destroy_main (int argc , char * * argv ) {
868866 (void )argc ; (void )argv ;
869- if (state ) {
867+ if (state ) {
870868 size_t total_size = sizeof (struct SharedState ) * allocated_num_nodes ;
871869 total_size = (total_size + 4095ULL ) & ~4095ULL ;
872870 munmap (state , total_size );
@@ -1111,9 +1109,7 @@ static int ring_indexer_numa_main(int argc, char **argv) {
11111109 if (phase == 0) { \
11121110 if (batch_counter >= _tc) { phase = 1; batch_counter = 0; } \
11131111 } else if (phase == 1) { \
1114- if (is_stalled) { \
1115- phase = 2; \
1116- } else if (batch_counter >= _tc) { \
1112+ if (batch_counter >= _tc) { \
11171113 L *= 2; \
11181114 if (L >= Lmax) { L = Lmax; phase = 2; } \
11191115 if (W < W_max_val && !fixed_workers) { \
@@ -1334,7 +1330,7 @@ static int ring_numa_scanner_main(int argc, char **argv) {
13341330
13351331 struct pollfd pfd = { .fd = current_pipe , .events = POLLIN };
13361332 if (active_pipe_idx != my_node_id ) {
1337- if (poll (& pfd , 1 , 0 ) <= 0 ) {
1333+ if (poll (& pfd , 1 , 0 ) <= 0 ) {
13381334 active_pipe_idx = (active_pipe_idx + 1 ) % num_nodes ;
13391335 continue ;
13401336 }
@@ -1433,7 +1429,7 @@ static int ring_numa_scanner_main(int argc, char **argv) {
14331429 }
14341430
14351431 char * nl = memchr (p , '\n' , end - p );
1436- if (nl ) {
1432+ if (nl ) {
14371433 if (BytesMax > 0 && lines_found > 0 ) {
14381434 uint64_t line_end_offset = buf_base_offset + (uint64_t )((nl + 1 ) - buf );
14391435 uint64_t payload = line_end_offset - batch_start ;
@@ -1445,11 +1441,11 @@ static int ring_numa_scanner_main(int argc, char **argv) {
14451441 }
14461442 lines_found ++ ;
14471443 p = nl + 1 ;
1448- } else {
1444+ } else {
14491445 uint64_t curr_pos = buf_base_offset + (end - buf );
14501446 if (curr_pos >= chunk_end ) { lines_found ++ ; p = end ; break ; }
14511447 else { p = end ; }
1452- }
1448+ }
14531449 }
14541450
14551451 if (limit_items > 0 && lines_found > 0 ) {
@@ -1548,7 +1544,7 @@ static inline void scanner_adaptive_commit(bool force) {
15481544 uint64_t intermediate = (linear < current_buffer ) ? linear : current_buffer ;
15491545 if (intermediate > W ) target_buffer = intermediate - W ;
15501546 else target_buffer = 0 ;
1551- }
1547+ }
15521548 uint64_t target = local_scan_idx - target_buffer ;
15531549 if (target > local_write_idx ) {
15541550 atomic_store_release (& state [0 ].write_idx , target );
@@ -1685,7 +1681,7 @@ static int ring_scanner_main(int argc, char **argv) {
16851681 uint64_t prev_avail = (p < end ) ? (uint64_t )(end - p ) : 0 ;
16861682 uint64_t current_p_offset = buf_base_offset + (uint64_t )(p - buf );
16871683
1688- if (lseek (fd , (off_t )current_p_offset , SEEK_SET ) < 0 ) {}
1684+ if (lseek (fd , (off_t )current_p_offset , SEEK_SET ) < 0 ) {}
16891685 ssize_t n = read (fd , buf , chunk_sz );
16901686
16911687 if (n > 0 && (uint64_t )n > prev_avail ) {
@@ -1988,7 +1984,7 @@ static int ring_claim_main(int argc, char **argv) {
19881984restart_loop :
19891985 while (1 ) {
19901986 struct EscrowPacket ep ;
1991- if (fd_escrow_r && fd_escrow_r [my_numa_node ] >= 0 &&
1987+ if (fd_escrow_r && fd_escrow_r [my_numa_node ] >= 0 &&
19921988 read (fd_escrow_r [my_numa_node ], & ep , sizeof (ep )) == sizeof (ep )) {
19931989 my_read_idx = ep .idx ;
19941990 claim_count = ep .cnt ;
@@ -2061,10 +2057,10 @@ static int ring_claim_main(int argc, char **argv) {
20612057 atomic_fetch_add (& local_state -> active_waiters , 1 );
20622058 is_waiting_on_ring = true;
20632059
2064- struct pollfd pfds [3 ] = {
2065- { .fd = evfd_data_arr [my_numa_node ], .events = POLLIN },
2066- { .fd = evfd_eof , .events = POLLIN },
2067- { .fd = fd_escrow_r [my_numa_node ], .events = POLLIN }
2060+ struct pollfd pfds [3 ] = {
2061+ { .fd = evfd_data_arr [my_numa_node ], .events = POLLIN },
2062+ { .fd = evfd_eof , .events = POLLIN },
2063+ { .fd = fd_escrow_r [my_numa_node ], .events = POLLIN }
20682064 };
20692065
20702066 while (1 ) {
@@ -2242,7 +2238,7 @@ static ssize_t robust_sendfile(int out_fd, int in_fd, off_t *offset, size_t coun
22422238 return total > 0 ? (ssize_t )total : -1 ;
22432239 }
22442240 if (s == 0 ) {
2245- if (retries ++ < 1000 ) { usleep (100 ); continue ; }
2241+ if (retries ++ < 100 ) { usleep (10 ); continue ; }
22462242 break ;
22472243 }
22482244 retries = 0 ;
@@ -2259,20 +2255,20 @@ static int ring_copy_chunk(int fd_in, int fd_out, off_t off, size_t len) {
22592255 while (total_read < len ) {
22602256 size_t to_read = (len - total_read > BUF_SIZE ) ? BUF_SIZE : (len - total_read );
22612257 ssize_t r = pread (fd_in , buf , to_read , off + total_read );
2262- if (r < 0 ) {
2258+ if (r < 0 ) {
22632259 if (errno == EINTR || errno == EAGAIN ) { usleep (10 ); continue ; }
22642260 free (buf ); return -1 ;
22652261 }
22662262 if (r == 0 ) {
2267- if (retries ++ < 1000 ) { usleep (100 ); continue ; }
2263+ if (retries ++ < 100 ) { usleep (10 ); continue ; }
22682264 break ;
22692265 }
22702266 retries = 0 ;
22712267 char * write_ptr = buf ;
22722268 size_t to_write = r ;
22732269 while (to_write > 0 ) {
22742270 ssize_t w = write (fd_out , write_ptr , to_write );
2275- if (w < 0 ) {
2271+ if (w < 0 ) {
22762272 if (errno == EINTR || errno == EAGAIN ) { usleep (10 ); continue ; }
22772273 free (buf ); return -1 ;
22782274 }
@@ -2443,7 +2439,7 @@ static int ring_worker_main(int argc, char **argv) {
24432439 if (state ) atomic_fetch_add (& state [node ].active_workers , 1 );
24442440 if (argc >= 3 && isdigit (argv [2 ][0 ])) worker_cached_fd = atoi (argv [2 ]);
24452441 }
2446- else if (!strcmp (argv [1 ],"dec" )) {
2442+ else if (!strcmp (argv [1 ],"dec" )) {
24472443 cleanup_waiter_state ();
24482444 if (state ) atomic_fetch_sub (& state [node ].active_workers , 1 );
24492445 worker_cached_fd = -1 ;
@@ -2711,7 +2707,7 @@ static int ring_copy_main(int argc, char **argv) {
27112707 size_t copied_in_chunk = 0 ;
27122708 while (copied_in_chunk < to_copy ) {
27132709 ssize_t n = copy_file_range (infd , & current_off , outfd , NULL , to_copy - copied_in_chunk , 0 );
2714- if (n < 0 ) {
2710+ if (n < 0 ) {
27152711 if (errno == EINTR ) continue ;
27162712 if (errno == EXDEV || errno == EINVAL || errno == ENOSYS || errno == EOPNOTSUPP ) break ;
27172713 goto err_out ;
@@ -2824,7 +2820,7 @@ static int ring_copy_main(int argc, char **argv) {
28242820 if (sysinfo (& si ) == 0 ) {
28252821 uint64_t mu = (uint64_t )si .mem_unit ? si .mem_unit : 1 ;
28262822 uint64_t free_b = (uint64_t )si .freeram * mu ;
2827- if (free_b < oom_threshold && state ) {
2823+ if (free_b < oom_threshold && state ) {
28282824 OOM_WAIT_FOR_MEMORY (free_b , oom_threshold , si , mu );
28292825 }
28302826 }
0 commit comments