Skip to content

Commit 9057073

Browse files
jkool702jkool702
authored andcommitted
fix for numa
1 parent 91d822e commit 9057073

4 files changed

Lines changed: 62 additions & 3614 deletions

File tree

forkrun_ring.c

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,22 +1086,39 @@ static int ring_indexer_numa_main(int argc, char **argv) {
10861086
}
10871087

10881088
int target = node_pipes[pkt.node_id % num_node_pipes];
1089-
if (write(target, &task, sizeof(task)) != sizeof(task)) break;
1089+
bool write_ok = false;
1090+
while(1) {
1091+
ssize_t w = write(target, &task, sizeof(task));
1092+
if (w == sizeof(task)) { write_ok = true; break; }
1093+
if (w < 0 && (errno == EINTR || errno == EAGAIN)) {
1094+
usleep(10);
1095+
continue;
1096+
}
1097+
break;
1098+
}
1099+
if (!write_ok) break;
1100+
10901101
last_major_seen = pkt.major_id;
10911102
}
10921103

10931104
if (pkt.major_id == UINT32_MAX && actual_start < pkt.offset) {
1094-
// Guard against UINT32_MAX wrap-around sentinel confusion
10951105
uint32_t final_major = (last_major_seen < UINT32_MAX - 1) ? last_major_seen + 1 : last_major_seen;
10961106
struct ScannerTask final_task = {
10971107
.major_id = final_major,
10981108
.pad = 0,
10991109
.start_off = actual_start,
11001110
.length = pkt.offset - actual_start
11011111
};
1102-
// Route exactly to the last node instead of defaulting to 0
11031112
int target = node_pipes[last_node_id % num_node_pipes];
1104-
write(target, &final_task, sizeof(final_task));
1113+
while(1) {
1114+
ssize_t w = write(target, &final_task, sizeof(final_task));
1115+
if (w == sizeof(final_task)) break;
1116+
if (w < 0 && (errno == EINTR || errno == EAGAIN)) {
1117+
usleep(10);
1118+
continue;
1119+
}
1120+
break;
1121+
}
11051122
}
11061123

11071124
xfree(node_pipes);
@@ -1131,7 +1148,7 @@ static int ring_indexer_numa_main(int argc, char **argv) {
11311148
if (_n_spawn > (W_max_val - W)) _n_spawn = W_max_val - W; \
11321149
if (fd_spawn >= 0) { \
11331150
char _sbuf[64]; \
1134-
int _slen = snprintf(_sbuf, sizeof(_sbuf), "%lu\n", _n_spawn); \
1151+
int _slen = snprintf(_sbuf, sizeof(_sbuf), "%d:%lu\n", my_node_id, _n_spawn); \
11351152
if (_slen > 0) SYS_CHK(write(fd_spawn, _sbuf, _slen)); \
11361153
W += _n_spawn; \
11371154
atomic_store_relaxed(&(state_ptr)->active_workers, W); \
@@ -1171,7 +1188,7 @@ static int ring_indexer_numa_main(int argc, char **argv) {
11711188
if (W + _grow > W_max_val) _grow = W_max_val - W; \
11721189
if (_grow > 0 && fd_spawn >= 0) { \
11731190
char _sbuf[64]; \
1174-
int _slen = snprintf(_sbuf, sizeof(_sbuf), "%lu\n", _grow); \
1191+
int _slen = snprintf(_sbuf, sizeof(_sbuf), "%d:%lu\n", my_node_id, _grow); \
11751192
if (_slen > 0) SYS_CHK(write(fd_spawn, _sbuf, _slen)); \
11761193
W += _grow; \
11771194
atomic_store_relaxed(&(state_ptr)->active_workers, W); \
@@ -1265,7 +1282,7 @@ static int ring_numa_scanner_main(int argc, char **argv) {
12651282
atomic_store_relaxed(&local_state->active_workers, W);
12661283

12671284
if (fd_spawn >= 0 && W > 0) {
1268-
char sbuf[64]; int slen = snprintf(sbuf, sizeof(sbuf), "%lu\n", W);
1285+
char sbuf[64]; int slen = snprintf(sbuf, sizeof(sbuf), "%d:%lu\n", my_node_id, W);
12691286
if (slen > 0) SYS_CHK(write(fd_spawn, sbuf, slen));
12701287
}
12711288

@@ -1596,6 +1613,7 @@ static int ring_scanner_main(int argc, char **argv) {
15961613
if (argc < 2) return EXECUTION_FAILURE;
15971614
int fd = atoi(argv[1]);
15981615
int fd_spawn = (argc >= 3) ? atoi(argv[2]) : -1;
1616+
int my_node_id = 0; // Ensure macro compatibility
15991617

16001618
uint64_t L = state[0].cfg_batch_start;
16011619
uint64_t Lmax = state[0].cfg_batch_max;
@@ -1647,7 +1665,7 @@ static int ring_scanner_main(int argc, char **argv) {
16471665

16481666
if (fd_spawn >= 0 && W > 0) {
16491667
char sbuf[64];
1650-
int slen = snprintf(sbuf, sizeof(sbuf), "%lu\n", W);
1668+
int slen = snprintf(sbuf, sizeof(sbuf), "%d:%lu\n", my_node_id, W);
16511669
if (slen > 0) SYS_CHK(write(fd_spawn, sbuf, slen));
16521670
}
16531671

@@ -1863,7 +1881,7 @@ static int ring_scanner_main(int argc, char **argv) {
18631881
uint64_t needed = W_target - W_curr;
18641882
if (needed > 0) {
18651883
char sbuf[64];
1866-
int slen = snprintf(sbuf, sizeof(sbuf), "%lu\n", needed);
1884+
int slen = snprintf(sbuf, sizeof(sbuf), "%d:%lu\n", my_node_id, needed);
18671885
if (slen > 0) SYS_CHK(write(fd_spawn, sbuf, slen));
18681886
W += needed;
18691887
atomic_store_relaxed(&state[0].active_workers, W);
@@ -2092,7 +2110,7 @@ static int ring_claim_main(int argc, char **argv) {
20922110
poll(pfds, 3, -1);
20932111
if (pfds[2].revents & POLLIN) break;
20942112
if (pfds[0].revents) { uint64_t v; if(read(evfd_data_arr[my_numa_node], &v, 8)){}; break; }
2095-
if (pfds[1].revents) break;
2113+
if (pfds[1].revents) { uint64_t v; if(read(evfd_eof, &v, 8)){}; break; }
20962114
}
20972115
cleanup_waiter_state();
20982116
spin = 0;
@@ -2130,7 +2148,7 @@ static int ring_claim_main(int argc, char **argv) {
21302148
struct pollfd pfds[2] = { { .fd = evfd_data_arr[my_numa_node], .events = POLLIN }, { .fd = evfd_eof, .events = POLLIN } };
21312149
poll(pfds, 2, -1);
21322150
if (pfds[0].revents) { uint64_t v; if(read(evfd_data_arr[my_numa_node], &v, 8)){}; }
2133-
if (pfds[1].revents) break;
2151+
if (pfds[1].revents) { uint64_t v; if(read(evfd_eof, &v, 8)){}; break; }
21342152
}
21352153
cleanup_waiter_state();
21362154
if (claim_count == 0) { spin = 0; goto restart_loop; }

frun.bash

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,25 @@ toc() { :; }
316316
ordered_flag=0
317317
[[ "${order_mode}" == "ordered" ]] && ordered_flag=1
318318

319-
( ring_numa_ingest ${fd0} ${fd_write} $index_pipe_w $claim_pipe_r $FORKRUN_NUM_NODES $ordered_flag ) &
320-
( ring_indexer_numa ${fd_scan} $index_pipe_r "${node_pipes_w[@]}" ) &
319+
(
320+
exec {index_pipe_r}<&- {claim_pipe_w}>&-
321+
for nr in "${node_pipes_r[@]}"; do exec {nr}<&-; done
322+
for nw in "${node_pipes_w[@]}"; do exec {nw}>&-; done
323+
ring_numa_ingest ${fd0} ${fd_write} $index_pipe_w $claim_pipe_r $FORKRUN_NUM_NODES $ordered_flag
324+
) &
325+
326+
(
327+
exec {index_pipe_w}>&- {claim_pipe_r}<&- {claim_pipe_w}>&-
328+
for nr in "${node_pipes_r[@]}"; do exec {nr}<&-; done
329+
ring_indexer_numa ${fd_scan} $index_pipe_r "${node_pipes_w[@]}"
330+
) &
321331

322332
for (( i=0; i<FORKRUN_NUM_NODES; i++ )); do
323-
( ring_numa_scanner ${fd_scan} $i $claim_pipe_w $fd_spawn_w $FORKRUN_NUM_NODES "${node_pipes_r[@]}" ) &
333+
(
334+
exec {index_pipe_r}<&- {index_pipe_w}>&- {claim_pipe_r}<&-
335+
for nw in "${node_pipes_w[@]}"; do exec {nw}>&-; done
336+
ring_numa_scanner ${fd_scan} $i $claim_pipe_w $fd_spawn_w $FORKRUN_NUM_NODES "${node_pipes_r[@]}"
337+
) &
324338
done
325339

326340
# Close Bash's copies of the pipes to allow background EOFs to cascade
@@ -450,17 +464,28 @@ P+=($!)
450464

451465
# --- SPAWN LOOP ---
452466
nWorkers=0
453-
node_idx=0
454-
while true; do
455-
read -r -u $fd_spawn_r N
456-
[[ "$N" == 'x' ]] && break
467+
finished_scanners=0
468+
while (( finished_scanners < FORKRUN_NUM_NODES )); do
469+
read -r -u $fd_spawn_r msg
470+
if [[ "$msg" == *'x'* ]]; then
471+
((finished_scanners++))
472+
continue
473+
fi
474+
475+
# Parse directed spawn "node:count" or legacy "count"
476+
if [[ "$msg" == *:* ]]; then
477+
node_idx="${msg%%:*}"
478+
N="${msg#*:}"
479+
else
480+
node_idx=0
481+
N="$msg"
482+
fi
457483

458484
target=$(( nWorkers + N ))
459485
(( target > nWorkersMax )) && target=$nWorkersMax
460486

461487
for (( ; nWorkers < target; nWorkers++ )); do
462488
spawn_worker "$nWorkers" "$node_idx"
463-
node_idx=$(( (node_idx + 1) % FORKRUN_NUM_NODES ))
464489
done
465490
done
466491

ring_loadables/.time

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +0,0 @@
1-
real 0m1.730s
2-
user 0m16.100s
3-
sys 0m27.027s

0 commit comments

Comments
 (0)