Skip to content

Commit 221dd7c

Browse files
AlexeyAkhunovAlexey Sharpclaude
authored
Test optimisation: pool resources allocated in the HexPatriciaHashed (#19467)
During the work on testlab, profiler identified two major sources of allocation in the tests, putting pressure on the GC. Pooling these resources avoids reallocation and reduces GC pressure during the tests Co-authored-by: Alexey Sharp <alexeysharp@Mac-mini.local> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent afa9069 commit 221dd7c

File tree

4 files changed

+128
-8
lines changed

4 files changed

+128
-8
lines changed

execution/commitment/commitment.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ type Trie interface {
112112

113113
// Process updates. If warmup.Enabled is true, pre-warms MDBX page cache in parallel.
114114
Process(ctx context.Context, updates *Updates, logPrefix string, progress chan *CommitProgress, warmup WarmupConfig) (rootHash []byte, err error)
115+
116+
// Release returns the trie to a pool for reuse. After calling Release,
117+
// the caller must not use the trie.
118+
Release()
115119
}
116120

117121
type CommitProgress struct {
@@ -429,6 +433,12 @@ func (be *BranchEncoder) ApplyDeferredUpdates(
429433
return nil
430434
}
431435

436+
// Pools for worker encoders/mergers to avoid per-call allocations.
437+
var (
438+
workerEncoderPool = sync.Pool{New: func() any { return NewBranchEncoder(1024) }}
439+
workerMergerPool = sync.Pool{New: func() any { return NewHexBranchMerger(512) }}
440+
)
441+
432442
// ApplyDeferredBranchUpdates encodes deferred branch updates concurrently and writes them.
433443
// Returns the number of updates successfully written.
434444
func ApplyDeferredBranchUpdates(
@@ -443,22 +453,49 @@ func ApplyDeferredBranchUpdates(
443453
numWorkers = 1
444454
}
445455

446-
// Pipeline: workers encode in parallel, results sent to channel, main goroutine writes sequentially
456+
// Sequential fast path: avoids goroutine and channel overhead for small batches.
457+
if numWorkers == 1 || len(deferred) <= numWorkers {
458+
encoder := workerEncoderPool.Get().(*BranchEncoder)
459+
merger := workerMergerPool.Get().(*BranchMerger)
460+
defer workerEncoderPool.Put(encoder)
461+
defer workerMergerPool.Put(merger)
462+
463+
var written int
464+
for _, upd := range deferred {
465+
if err := encodeDeferredUpdate(upd, encoder, merger); err != nil {
466+
return written, err
467+
}
468+
if upd.encoded == nil {
469+
continue
470+
}
471+
if err := putBranch(upd.prefix, upd.encoded, upd.prev); err != nil {
472+
return written, err
473+
}
474+
mxTrieBranchesUpdated.Inc()
475+
written++
476+
}
477+
return written, nil
478+
}
479+
480+
// Pipeline: workers encode in parallel, results sent to channel, main goroutine writes sequentially.
447481
type result struct {
448482
upd *DeferredBranchUpdate
449483
err error
450484
}
451-
resultCh := make(chan result, maxDeferredUpdates)
452-
workCh := make(chan *DeferredBranchUpdate, maxDeferredUpdates)
485+
// Size channels to actual batch length, not the 50K max.
486+
resultCh := make(chan result, len(deferred))
487+
workCh := make(chan *DeferredBranchUpdate, len(deferred))
453488

454-
// Start workers - each with its own encoder and merger
489+
// Start workers with pooled encoders/mergers.
455490
var wg sync.WaitGroup
456491
for i := 0; i < numWorkers; i++ {
457492
wg.Add(1)
458493
go func() {
459494
defer wg.Done()
460-
encoder := NewBranchEncoder(1024)
461-
merger := NewHexBranchMerger(512)
495+
encoder := workerEncoderPool.Get().(*BranchEncoder)
496+
merger := workerMergerPool.Get().(*BranchMerger)
497+
defer workerEncoderPool.Put(encoder)
498+
defer workerMergerPool.Put(merger)
462499

463500
for upd := range workCh {
464501
err := encodeDeferredUpdate(upd, encoder, merger)

execution/commitment/commitmentdb/commitment_context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ func (sdc *SharedDomainsCommitmentContext) trieContext(tx kv.TemporalTx, txNum u
308308

309309
func (sdc *SharedDomainsCommitmentContext) Close() {
310310
sdc.updates.Close()
311+
sdc.patriciaTrie.Release()
311312
}
312313

313314
func (sdc *SharedDomainsCommitmentContext) Reset() {

execution/commitment/hex_concurrent_patricia_hashed.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ func (p *ConcurrentPatriciaHashed) Reset() {
345345
}
346346
}
347347

348+
func (p *ConcurrentPatriciaHashed) Release() {
349+
p.root.Release()
350+
}
351+
348352
// Set context for state IO
349353
func (p *ConcurrentPatriciaHashed) ResetContext(ctx PatriciaContext) {
350354
p.root.ctx = ctx

execution/commitment/hex_patricia_hashed.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"runtime"
2929
"sort"
3030
"strings"
31+
"sync"
3132
"sync/atomic"
3233
"time"
3334

@@ -120,12 +121,23 @@ func (hph *HexPatriciaHashed) SpawnSubTrie(ctx PatriciaContext, forNibble int) *
120121
return subTrie
121122
}
122123

124+
var hphPool sync.Pool
125+
123126
func NewHexPatriciaHashed(accountKeyLen int16, ctx PatriciaContext) *HexPatriciaHashed {
127+
hph, ok := hphPool.Get().(*HexPatriciaHashed)
128+
if !ok {
129+
hph = newHexPatriciaHashed()
130+
}
131+
hph.resetForReuse()
132+
hph.accountKeyLen = accountKeyLen
133+
hph.ctx = ctx
134+
return hph
135+
}
136+
137+
func newHexPatriciaHashed() *HexPatriciaHashed {
124138
hph := &HexPatriciaHashed{
125-
ctx: ctx,
126139
keccak: keccak.NewFastKeccak(),
127140
keccak2: keccak.NewFastKeccak(),
128-
accountKeyLen: accountKeyLen,
129141
auxBuffer: bytes.NewBuffer(make([]byte, 8192)),
130142
hadToLoadL: make(map[uint64]skipStat),
131143
accValBuf: make(rlp.RlpEncodedBytes, 128),
@@ -138,6 +150,72 @@ func NewHexPatriciaHashed(accountKeyLen int16, ctx PatriciaContext) *HexPatricia
138150
return hph
139151
}
140152

153+
// resetForReuse resets all mutable state so a pooled HexPatriciaHashed is safe to reuse.
154+
// The large grid array is NOT zeroed — activeRows=0 means no cells are live,
155+
// and cells are properly initialized via cell.reset() during unfold/fold.
156+
func (hph *HexPatriciaHashed) resetForReuse() {
157+
// SetState(nil) resets: root, rootTouched, rootChecked, rootPresent,
158+
// currentKeyLen, activeRows, depths, branchBefore, touchMap, afterMap.
159+
hph.root.reset()
160+
hph.rootTouched = false
161+
hph.rootChecked = false
162+
hph.rootPresent = false
163+
hph.currentKeyLen = 0
164+
hph.activeRows = 0
165+
for i := range hph.depths {
166+
hph.depths[i] = 0
167+
hph.branchBefore[i] = false
168+
hph.touchMap[i] = 0
169+
hph.afterMap[i] = 0
170+
}
171+
172+
// ctx — set by caller after pool get
173+
hph.ctx = nil
174+
175+
// reuse map, don't reallocate
176+
clear(hph.hadToLoadL)
177+
178+
// reuse slice backing
179+
hph.mountedTries = hph.mountedTries[:0]
180+
hph.mounted = false
181+
hph.mountedNib = 0
182+
183+
// warmup cache
184+
hph.cache = nil
185+
hph.enableWarmupCache = false
186+
187+
// tracing / capture
188+
hph.capture = nil
189+
hph.trace = false
190+
hph.traceDomain = false
191+
192+
// flags
193+
hph.memoizationOff = false
194+
hph.leaveDeferredForCaller = false
195+
196+
// auxiliary buffer
197+
hph.auxBuffer.Reset()
198+
199+
// branch encoder: clear deferred updates, reset buffer, nil cache, re-enable deferred
200+
hph.branchEncoder.ClearDeferred()
201+
hph.branchEncoder.buf.Reset()
202+
hph.branchEncoder.cache = nil
203+
hph.branchEncoder.SetDeferUpdates(true)
204+
205+
// depth-to-txnum mapping
206+
clear(hph.depthsToTxNum[:])
207+
}
208+
209+
// Release returns this HexPatriciaHashed to the pool for reuse.
210+
// After calling Release, the caller must not use the struct.
211+
func (hph *HexPatriciaHashed) Release() {
212+
hph.ctx = nil
213+
hph.cache = nil
214+
hph.mountedTries = nil
215+
hph.capture = nil
216+
hphPool.Put(hph)
217+
}
218+
141219
type cell struct {
142220
hashedExtension [128]byte
143221
extension [64]byte

0 commit comments

Comments
 (0)