Skip to content

Commit 0d615da

Browse files
jkool702jkool702
authored andcommitted
moved read_pow2 into TLS to avoid needing to update it with CAS
1 parent 1d5198a commit 0d615da

1 file changed

Lines changed: 73 additions & 61 deletions

File tree

forkrun_ring.c

Lines changed: 73 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ static inline char *try_simd_scan(char *p, char *safe_end, uint64_t target,
330330
__ATOMIC_RELAXED)
331331

332332
// PUBLISH_BATCH_SIZE removed in v3.2.2: replaced by ring_pow2 doubling records.
333-
// In steady state (read_pow2 == write_pow2), workers claim 1 slot per atomic_fetch_add
333+
// In steady state (tls_read_pow2 == write_pow2), workers claim 1 slot per atomic_fetch_add
334334
// with no per-claim signalling overhead. Batch size changes are invisible to workers.
335335

336336
#ifndef GIT_HASH
@@ -1068,6 +1068,11 @@ static __thread uint64_t tl_remainder_idx = 0;
10681068
static __thread uint64_t tl_remainder_cnt = 0;
10691069
static __thread uint32_t tl_remainder_kills = 0;
10701070
static __thread bool tl_drain_escrow = true;
1071+
// Per-worker state-machine cursor for the ring_pow2 geometric ramp-up.
1072+
// Replaces the global CAS-contended read_pow2 field with zero-contention TLS.
1073+
// Workers read write_pow2/ring_pow2[] (acquire, read-only → S-state in all L1
1074+
// caches) and write only to their own private tls_read_pow2 (never shared).
1075+
static __thread uint8_t tls_read_pow2 = 0;
10711076

10721077
// ------------------------------------------------------------------
10731078
// WorkerBatchState: Pure value struct returned by do_lockfree_claim.
@@ -1214,11 +1219,11 @@ struct SharedState {
12141219
uint8_t fallow_active;
12151220
uint8_t ingest_complete;
12161221
uint8_t emergency_abort;
1217-
// ring_pow2 state machine (v3.2.2): bimodal slow/fast claim path.
1222+
// ring_pow2 state machine (v3.2.2 → v3.2.3+): bimodal slow/fast claim path.
12181223
// write_pow2 is advanced by the scanner each time L doubles during ramp-up.
1219-
// read_pow2 is advanced by workers (via CAS) as they consume past each boundary.
1220-
// Steady state: read_pow2 == write_pow2 → workers hit the unconditional fast path.
1221-
uint8_t read_pow2;
1224+
// Workers track their own cursor in tls_read_pow2 (TLS), so write_pow2 and
1225+
// ring_pow2[] stay permanently in the Shared (S) cache state on every core.
1226+
// Steady state: tls_read_pow2 == write_pow2 → workers hit the fast path.
12221227
uint8_t write_pow2;
12231228

12241229
uint32_t indexer_waiters ALIGNED(CACHE_LINE);
@@ -1694,8 +1699,7 @@ static int ring_init_main(int argc, char **argv) {
16941699

16951700
// Reset PID Controller / Flow State
16961701
atomic_store_relaxed(&state[n].active_workers, state[n].cfg_w_start);
1697-
// v3.2.2: reset ring_pow2 bimodal claim state machine
1698-
atomic_store_relaxed(&state[n].read_pow2, 0);
1702+
// Reset ring_pow2 scanner state machine; workers reset tls_read_pow2 at claim time.
16991703
atomic_store_relaxed(&state[n].write_pow2, 0);
17001704

17011705
state[n].offset_ring[0] = 0;
@@ -1885,10 +1889,8 @@ static int ring_init_main(int argc, char **argv) {
18851889

18861890
state[n].fixed_workers = (state[n].cfg_w_start == state[n].cfg_w_max);
18871891
state[n].fixed_batch = (state[n].cfg_batch_start == state[n].cfg_batch_max);
1888-
// v3.2.2: read_pow2 and write_pow2 start at 0 (equal → fast path from the
1889-
// beginning; scanner advances write_pow2 as L doubles during ramp-up).
1890-
// memset already zeroed these fields; explicit stores here for documentation.
1891-
atomic_store_relaxed(&state[n].read_pow2, 0);
1892+
// write_pow2 starts at 0; scanner advances it as L doubles during ramp-up.
1893+
// memset already zeroed this field; explicit store here for documentation.
18921894
atomic_store_relaxed(&state[n].write_pow2, 0);
18931895

18941896
// Dynamic Topology-Aware Steal Thresholds from ACPI SRAT Table
@@ -2870,8 +2872,9 @@ static int ring_indexer_numa_main(int argc, char **argv) {
28702872
} while (0)
28712873

28722874
// v3.2.2: Record this batch-size doubling boundary in ring_pow2.
2873-
// Workers observing read_pow2 < write_pow2 will multi-claim up to
2874-
// ring_pow2[read_pow2]-read_idx slots and then CAS-promote read_pow2.
2875+
// Workers observing tls_read_pow2 < write_pow2 will multi-claim up to
2876+
// 2^(write_pow2 - tls_read_pow2) slots; tls_read_pow2 is advanced locally
2877+
// (no CAS) via catch-up + eager advance in do_lockfree_claim.
28752878
//
28762879
// v3.2.2: In steady state workers claim 1 slot/op regardless of L.
28772880
// L is updated here to tune scanner batch size only; no publish needed.
@@ -2925,7 +2928,7 @@ static int ring_indexer_numa_main(int argc, char **argv) {
29252928
uint8_t _wp = atomic_load_relaxed(&(state_ptr)->write_pow2); \
29262929
if (_wp < 64) { \
29272930
(state_ptr)->ring_pow2[_wp] = local_scan_idx; \
2928-
atomic_store_release(&(state_ptr)->write_pow2, _wp); \
2931+
atomic_store_release(&(state_ptr)->write_pow2, _wp + 1); \
29292932
} \
29302933
} \
29312934
batch_counter = 0; \
@@ -3788,9 +3791,9 @@ core_scanner_loop(int fd_or_memfd, int my_node_id, int fd_spawn, int num_nodes,
37883791
}
37893792

37903793
if (is_numa) {
3791-
// v3.2.2: No PUBLISH_BATCH_SIZE needed at EOF. Once write_pow2 stops advancing,
3792-
// workers drain read_pow2 up to write_pow2 via the CAS loop and then permanently
3793-
// enter the fast path (claim_count == 1).
3794+
// v3.2.2+: No PUBLISH_BATCH_SIZE needed at EOF. Once write_pow2 stops advancing,
3795+
// workers' tls_read_pow2 catch-up loop fully converges on the next claim and
3796+
// they permanently enter the fast path (claim_count == 1).
37943797
atomic_store_release(&local_state->write_idx, local_scan_idx);
37953798
atomic_store_release(&local_state->scanner_finished, 1);
37963799

@@ -4042,55 +4045,64 @@ static int do_lockfree_claim(struct WorkerBatchState *out, bool blocking) {
40424045
uint64_t r_curr = atomic_load_relaxed(&local_state->read_idx);
40434046

40444047
if (r_curr < w_snap) {
4045-
// v3.2.2 bimodal fast/slow claim path.
4046-
// Fast path (steady state): read_pow2 == write_pow2 → claim exactly 1.
4047-
// Slow path (ramp-up phase): read_pow2 < write_pow2 → multi-slot speculative claim.
4048-
uint8_t r_pow = atomic_load_relaxed(&local_state->read_pow2);
4048+
// v3.2.3+ TLS-local bimodal fast/slow claim path.
4049+
//
4050+
// write_pow2 and ring_pow2[] are written only by the scanner (release
4051+
// store) and read here under acquire. Because no worker ever writes
4052+
// these fields, they sit permanently in the Shared (S) cache state on
4053+
// every core — zero cross-core invalidations.
4054+
//
4055+
// tls_read_pow2 is private to this worker: no CAS, no coherence traffic.
4056+
//
4057+
// Memory-ordering guarantee: the acquire load of write_pow2 creates a
4058+
// happens-before edge that covers all preceding plain writes to
4059+
// ring_pow2[] (paired with the scanner's release store), so the plain
4060+
// reads of ring_pow2[tls_read_pow2] below are safe.
4061+
40494062
uint8_t w_pow = atomic_load_acquire(&local_state->write_pow2);
4050-
claim_count = 1;
40514063

4052-
// UPDATED TWEAK: Bypasses slow path if r_pow is lagging by only 1 level
4053-
if (__builtin_expect(r_pow != w_pow, 0)) {
4064+
// SAFETY CLAMP: guard against stale tls_read_pow2 from a prior
4065+
// invocation (in the common case of fresh worker forks this never
4066+
// fires, but keeps us correct if workers are ever recycled).
4067+
if (__builtin_expect(tls_read_pow2 > w_pow, 0))
4068+
tls_read_pow2 = 0;
4069+
4070+
// CATCH-UP: fast-forward our local cursor to global progress.
4071+
// If this worker was preempted or is a late starter, this instantly
4072+
// advances tls_read_pow2 past any doubling boundaries that r_curr has
4073+
// already consumed — zero CAS, purely read-only shared state.
4074+
while (tls_read_pow2 < w_pow &&
4075+
local_state->ring_pow2[tls_read_pow2] <= r_curr) {
4076+
tls_read_pow2++;
4077+
}
40544078

4079+
claim_count = 1;
4080+
if (__builtin_expect(tls_read_pow2 < w_pow, 0)) {
40554081
// Slow path: geometric ramp-up.
4056-
// Claim up to 2^(w_pow-r_pow) slots, capped at min(8, 2 * max_workers)
4057-
uint8_t diff = w_pow - r_pow;
4082+
// Claim up to 2^(w_pow - tls_read_pow2) slots, capped by
4083+
// speculative_max_claim and the number of actually available slots.
4084+
uint8_t diff = w_pow - tls_read_pow2;
40584085
uint64_t spec = (diff < 63) ? (1ULL << diff) : (uint64_t)UINT32_MAX;
4059-
4060-
uint64_t max_spec = local_state->speculative_max_claim;
4061-
if (spec > max_spec) {
4062-
spec = max_spec;
4063-
}
4064-
if (spec > 1) {
4065-
claim_count = spec;
4066-
}
4086+
if (spec > local_state->speculative_max_claim)
4087+
spec = local_state->speculative_max_claim;
4088+
uint64_t avail = w_snap - r_curr;
4089+
if (spec > avail)
4090+
spec = avail;
4091+
if (spec > 1)
4092+
claim_count = spec;
40674093
}
40684094

4069-
if (r_curr + claim_count > w_snap)
4070-
claim_count = w_snap - r_curr;
4071-
40724095
my_read_idx = __atomic_fetch_add(&local_state->read_idx, claim_count,
40734096
__ATOMIC_SEQ_CST);
40744097

4075-
// CAS promotion loop: advance read_pow2 past every doubling boundary
4076-
// that our claim crossed. Self-correcting: if a concurrent worker already
4077-
// advanced read_pow2 past our boundary, expected is refreshed and we re-check.
4078-
// UPDATED TWEAK: Bypasses slow path if r_pow is lagging by only 1 level
4079-
if (__builtin_expect(r_pow != w_pow, 0)) {
4080-
uint64_t claim_end = my_read_idx + claim_count;
4081-
uint8_t curr_pow = __atomic_load_n(&local_state->read_pow2, __ATOMIC_ACQUIRE);
4082-
uint8_t wp_snap = __atomic_load_n(&local_state->write_pow2, __ATOMIC_ACQUIRE);
4083-
while (curr_pow < wp_snap && local_state->ring_pow2[curr_pow] <= claim_end) {
4084-
uint8_t expected = curr_pow;
4085-
uint8_t desired = curr_pow + 1;
4086-
if (__atomic_compare_exchange_n(&local_state->read_pow2,
4087-
&expected, desired, false,
4088-
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
4089-
curr_pow = desired;
4090-
} else {
4091-
curr_pow = expected; // CAS failed: expected now holds current memory value
4092-
}
4093-
}
4098+
// EAGER ADVANCE: advance our cursor past every boundary our claim crossed.
4099+
// This primes the next iteration: if we consumed the last slots before a
4100+
// doubling boundary, the next claim sees tls_read_pow2 already at w_pow
4101+
// and takes the fast path immediately.
4102+
uint64_t claim_end = my_read_idx + claim_count;
4103+
while (tls_read_pow2 < w_pow &&
4104+
local_state->ring_pow2[tls_read_pow2] <= claim_end) {
4105+
tls_read_pow2++;
40944106
}
40954107

40964108
break;
@@ -4259,21 +4271,21 @@ static int do_lockfree_claim(struct WorkerBatchState *out, bool blocking) {
42594271
if (claim_count > 1 && local_state->numa_enabled) {
42604272
uint64_t safe_count = claim_count;
42614273
uint64_t flags;
4262-
4274+
42634275
// O(1) SWAR Read: Grab the next 8 slots in a single instruction
42644276
memcpy(&flags, &local_state->boundary_ring[my_read_idx & RING_MASK], 8);
4265-
4277+
42664278
// Mask out the bits that are beyond our claim_count
42674279
uint64_t valid_mask = (claim_count == 8) ? ~0ULL : (1ULL << (claim_count * 8)) - 1;
4268-
4280+
42694281
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)
42704282
// --- BIG-ENDIAN PATH (s390x, legacy ppc64) ---
42714283
// On Big-Endian, Byte 0 (lowest address) is at the MSB (bits 56-63).
42724284
// We mask out the trailing bytes that exceed our claim count by shifting.
42734285
if (claim_count < 8) {
42744286
flags &= (valid_mask << (64 - (claim_count * 8)));
42754287
}
4276-
4288+
42774289
if (flags != 0) {
42784290
// Search from MSB downwards using Count Leading Zeroes
42794291
uint32_t first_boundary_idx = __builtin_clzll(flags) / 8;
@@ -4283,7 +4295,7 @@ static int do_lockfree_claim(struct WorkerBatchState *out, bool blocking) {
42834295
// --- LITTLE-ENDIAN PATH (x86_64, aarch64, ppc64le, riscv64) ---
42844296
// On Little-Endian, Byte 0 (lowest address) is at the LSB (bits 0-7).
42854297
flags &= valid_mask;
4286-
4298+
42874299
if (flags != 0) {
42884300
// Search from LSB upwards using Count Trailing Zeroes
42894301
uint32_t first_boundary_idx = __builtin_ctzll(flags) / 8;

0 commit comments

Comments
 (0)