Skip to content

Commit c15f433

Browse files
awskiiAskAlexSharovCopilot
authored
commitment: fix warmuper arena data race in HashSort (erigontech#21432)
## Problem `HashSort` streams each batch's hashed/plain keys into a reused `byteArena` bump buffer and hands every key to the async warmuper as a sub-slice for MDBX prefetch. At each 10k batch boundary it reset that single arena while warmup workers were still reading earlier keys — the next batch's `arenaAlloc` overwrote bytes a worker was mid-read on. Data race. Latent on main: `HexToCompact` tolerates the garbage (at worst a wasted prefetch). On nibblesv2 (erigontech#21146) `EncodeKeyV2` validates nibbles and panics on the corrupted byte: `panic: nibbles v2: nibble at index 68 is 0xff`, mainnet ~blk 24.83M, mid commitment. ## Fix Replace the single arena with a 2-slot ring (`arenaRingSize`) keyed by a generation counter. Each warmed key is tagged with the current `gen`; the warmuper keeps a per-slot in-flight count (`outstanding[gen % ringSize]`). Before a batch boundary reuses a slot, the producer calls `WaitBufferFree(slot)`, which blocks until that slot's previous-generation warm items have drained — so no worker still references the bytes about to be overwritten. Workers decrement on completion and broadcast on drain-to-zero; a waker goroutine releases any waiter on ctx cancellation. Zero-copy, no per-key allocation: the arena is pre-sized once per batch (`arenaEnsureCap`), and an over-capacity key falls back to an independent allocation rather than reallocating the buffer (which would invalidate live sub-slices). Wired at both `HashSort` batch boundaries for `ModeDirect` and `ModeUpdate`; the `nil`-warmuper path is unchanged. ## Tests - `TestHashSort_WarmupArenaNoRace` — `-race` repro; DATA RACE in `HexToCompact` on the old single-arena wiring, green after. Covers `ModeDirect` and `ModeUpdate`. - `WaitBufferFree` behaviour: blocks until a straggler drains, fast-path when the slot is already empty, unblocks on ctx cancel. - Slot-reuse invariant `curArena == gen % arenaRingSize` survives a cancel landing inside a boundary wait; `arenaAlloc` returns non-overlapping sub-slices and falls back cleanly on over-capacity. `make lint` clean, `make erigon integration` builds, commitment package green under `-race`. --------- Co-authored-by: Alex Sharov <AskAlexSharov@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 19115fe commit c15f433

3 files changed

Lines changed: 534 additions & 29 deletions

File tree

execution/commitment/commitment.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,36 +1457,46 @@ type Updates struct {
14571457
nibbles [16]*etl.Collector
14581458

14591459
batchSlab []KeyUpdate // grow-only slab for HashSort batch (avoids per-key heap allocs)
1460-
byteArena []byte // grow-only byte arena for HashSort key copies
1460+
1461+
// Ring of byte arenas for HashSort key copies; a slot is reused only after its prior generation's warm items drain.
1462+
arenas [arenaRingSize][]byte
1463+
curArena int
1464+
gen uint64
14611465
}
14621466

1467+
// arenaRingSize is how many byte arenas HashSort cycles; raising it only adds memory headroom, never affects correctness.
1468+
const arenaRingSize = 2
1469+
14631470
// arenaAlloc appends b to the byte arena and returns the sub-slice.
14641471
// The returned slice is valid until the arena is reset.
14651472
// The arena must have sufficient capacity (via arenaEnsureCap) before
14661473
// accumulating a batch; if capacity is exceeded, arenaAlloc falls back
14671474
// to an independent heap allocation to keep previously returned
14681475
// sub-slices valid.
14691476
func (t *Updates) arenaAlloc(b []byte) []byte {
1470-
off := len(t.byteArena)
1477+
arena := t.arenas[t.curArena]
1478+
off := len(arena)
14711479
needed := off + len(b)
1472-
if needed > cap(t.byteArena) {
1480+
if needed > cap(arena) {
14731481
// Arena capacity exceeded — fall back to an independent allocation.
14741482
// This keeps previously returned sub-slices valid while avoiding a
14751483
// panic that would crash a production node.
14761484
result := make([]byte, len(b))
14771485
copy(result, b)
14781486
return result
14791487
}
1480-
t.byteArena = t.byteArena[:needed]
1481-
copy(t.byteArena[off:], b)
1482-
return t.byteArena[off:needed]
1488+
arena = arena[:needed]
1489+
copy(arena[off:], b)
1490+
t.arenas[t.curArena] = arena
1491+
return arena[off:needed]
14831492
}
14841493

1485-
// arenaEnsureCap ensures the byte arena has at least cap bytes of capacity.
1486-
// Must be called before each batch to prevent mid-batch reallocation.
1494+
// arenaEnsureCap reserves at least c bytes in every ring buffer; call before a batch so a mid-batch grow can't reallocate and invalidate returned sub-slices.
14871495
func (t *Updates) arenaEnsureCap(c int) {
1488-
if cap(t.byteArena) < c {
1489-
t.byteArena = make([]byte, 0, c)
1496+
for i := range t.arenas {
1497+
if cap(t.arenas[i]) < c {
1498+
t.arenas[i] = make([]byte, 0, c)
1499+
}
14901500
}
14911501
}
14921502

@@ -1811,12 +1821,14 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
18111821
clear(t.keys)
18121822

18131823
t.batchSlab = t.batchSlab[:0]
1814-
// Pre-allocate arena to avoid mid-batch reallocation that would
1815-
// invalidate previously returned sub-slices (hk/pk in batchSlab).
1816-
// Worst case: storage keys produce 128-byte nibblized hashed keys +
1817-
// 52-byte plain keys = 180 bytes/key. Use 192 with headroom.
1824+
if warmuper != nil {
1825+
if err := warmuper.WaitBufferFree(t.curArena); err != nil {
1826+
return err
1827+
}
1828+
}
1829+
// Pre-size the arena so a mid-batch grow can't reallocate and invalidate live sub-slices (≤180 B/key, 192 with headroom).
18181830
t.arenaEnsureCap(hashSortBatchSize * 192)
1819-
t.byteArena = t.byteArena[:0]
1831+
t.arenas[t.curArena] = t.arenas[t.curArena][:0]
18201832
var prevKey []byte
18211833

18221834
err := t.etl.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
@@ -1838,7 +1850,7 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
18381850
startDepth++
18391851
}
18401852
}
1841-
warmuper.WarmKey(hk, startDepth)
1853+
warmuper.WarmKey(hk, startDepth, t.gen)
18421854
prevKey = append(prevKey[:0], hk...)
18431855
}
18441856

@@ -1854,11 +1866,17 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
18541866
return err
18551867
}
18561868
}
1869+
t.batchSlab = t.batchSlab[:0]
1870+
nextGen := t.gen + 1
1871+
slot := int(nextGen % arenaRingSize)
18571872
if warmuper != nil {
1858-
warmuper.DrainPending()
1873+
if err := warmuper.WaitBufferFree(slot); err != nil {
1874+
return err
1875+
}
18591876
}
1860-
t.batchSlab = t.batchSlab[:0]
1861-
t.byteArena = t.byteArena[:0]
1877+
t.gen = nextGen
1878+
t.arenas[slot] = t.arenas[slot][:0]
1879+
t.curArena = slot
18621880
}
18631881
return nil
18641882
}, etl.TransformArgs{Quit: ctx.Done()})
@@ -1882,8 +1900,13 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
18821900

18831901
case ModeUpdate:
18841902
t.batchSlab = t.batchSlab[:0]
1903+
if warmuper != nil {
1904+
if err := warmuper.WaitBufferFree(t.curArena); err != nil {
1905+
return err
1906+
}
1907+
}
18851908
t.arenaEnsureCap(hashSortBatchSize * 144)
1886-
t.byteArena = t.byteArena[:0]
1909+
t.arenas[t.curArena] = t.arenas[t.curArena][:0]
18871910
var prevKey []byte
18881911
var processErr error
18891912

@@ -1906,7 +1929,7 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
19061929
startDepth++
19071930
}
19081931
}
1909-
warmuper.WarmKey(hk, startDepth)
1932+
warmuper.WarmKey(hk, startDepth, t.gen)
19101933
prevKey = append(prevKey[:0], hk...)
19111934
}
19121935

@@ -1923,11 +1946,18 @@ func (t *Updates) HashSort(ctx context.Context, warmuper *Warmuper, fn func(hk,
19231946
return false
19241947
}
19251948
}
1949+
t.batchSlab = t.batchSlab[:0]
1950+
nextGen := t.gen + 1
1951+
slot := int(nextGen % arenaRingSize)
19261952
if warmuper != nil {
1927-
warmuper.DrainPending()
1953+
if err := warmuper.WaitBufferFree(slot); err != nil {
1954+
processErr = err
1955+
return false
1956+
}
19281957
}
1929-
t.batchSlab = t.batchSlab[:0]
1930-
t.byteArena = t.byteArena[:0]
1958+
t.gen = nextGen
1959+
t.arenas[slot] = t.arenas[slot][:0]
1960+
t.curArena = slot
19311961
}
19321962
return true
19331963
})
@@ -1971,7 +2001,11 @@ func (t *Updates) Reset() {
19712001
default:
19722002
}
19732003
t.batchSlab = t.batchSlab[:0]
1974-
t.byteArena = t.byteArena[:0]
2004+
for i := range t.arenas {
2005+
t.arenas[i] = t.arenas[i][:0]
2006+
}
2007+
t.curArena = 0
2008+
t.gen = 0
19752009
}
19762010

19772011
type KeyUpdate struct {

0 commit comments

Comments
 (0)