Skip to content

Commit 8e76ad6

Browse files
Mark Holtclaude
andcommitted
commitment: add branch prefetcher at TouchKey time
Background prefetcher fires at TouchPlainKey time (during execution), populating the persistent BranchCache well before Process/HashSort needs the data. This gives the trie a warm cache without waiting for the HashSort phase. Architecture is now 2-level prefetch: 1. TouchKey → BranchPrefetcher → persistent BranchCache (async, during execution) 2. HashSort → warmup workers read persistent cache → DB only on miss The prefetcher is stopped before Process() to avoid DB contention, and cleaned up in Close(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e228adb commit 8e76ad6

6 files changed

Lines changed: 192 additions & 0 deletions

File tree

db/state/execctx/domain_shared.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,16 @@ func (sd *SharedDomains) EnableWarmupCache(enable bool) {
704704
sd.sdCtx.EnableWarmupCache(enable)
705705
}
706706

707+
// StartBranchPrefetcher starts background branch prefetching at TouchKey time.
708+
func (sd *SharedDomains) StartBranchPrefetcher(ctx context.Context) {
709+
sd.sdCtx.StartBranchPrefetcher(ctx)
710+
}
711+
712+
// StopBranchPrefetcher stops the background branch prefetcher.
713+
func (sd *SharedDomains) StopBranchPrefetcher() {
714+
sd.sdCtx.StopBranchPrefetcher()
715+
}
716+
707717
// SetDeferCommitmentUpdates enables or disables deferred commitment updates.
708718
// When enabled, commitment branch updates are stored in the commitment context
709719
// instead of being applied inline, and must be flushed later via FlushPendingUpdates.

execution/commitment/branch_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ func (c *BranchCache) Invalidate(key []byte) {
7575
c.branches.Delete(key)
7676
}
7777

78+
// Contains checks if a key exists in the cache without updating recency or counters.
79+
func (c *BranchCache) Contains(key []byte) bool {
80+
return c.branches.Contains(key)
81+
}
82+
7883
// Len returns the number of entries in the cache.
7984
func (c *BranchCache) Len() int {
8085
return c.branches.Len()
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package commitment
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
7+
"golang.org/x/sync/errgroup"
8+
)
9+
10+
// BranchPrefetcher pre-fetches branch nodes into the persistent BranchCache
11+
// in the background, triggered at TouchKey time (during execution) so that
12+
// branch data is already cached when Process/HashSort runs later.
13+
//
14+
// This gives the trie a warm persistent cache without waiting for HashSort.
15+
type BranchPrefetcher struct {
16+
cache *BranchCache
17+
ctxFactory TrieContextFactory
18+
maxDepth int
19+
numWorkers int
20+
21+
work chan []byte // hashed keys to prefetch branches for
22+
g *errgroup.Group
23+
ctx context.Context
24+
cancel context.CancelFunc
25+
26+
prefetched atomic.Uint64
27+
started atomic.Bool
28+
closed atomic.Bool
29+
}
30+
31+
// NewBranchPrefetcher creates a prefetcher that populates the given BranchCache.
32+
// Call Start() to begin processing, then Submit() hashed keys as they arrive.
33+
func NewBranchPrefetcher(cache *BranchCache, ctxFactory TrieContextFactory, numWorkers, maxDepth int) *BranchPrefetcher {
34+
ctx, cancel := context.WithCancel(context.Background())
35+
return &BranchPrefetcher{
36+
cache: cache,
37+
ctxFactory: ctxFactory,
38+
maxDepth: maxDepth,
39+
numWorkers: numWorkers,
40+
ctx: ctx,
41+
cancel: cancel,
42+
}
43+
}
44+
45+
// Start launches background prefetch workers.
46+
func (p *BranchPrefetcher) Start() {
47+
if p.started.Swap(true) {
48+
return
49+
}
50+
p.work = make(chan []byte, p.numWorkers*128)
51+
p.g, p.ctx = errgroup.WithContext(p.ctx)
52+
53+
for i := 0; i < p.numWorkers; i++ {
54+
p.g.Go(func() error {
55+
trieCtx, cleanup := p.ctxFactory()
56+
if cleanup != nil {
57+
defer cleanup()
58+
}
59+
60+
for hashedKey := range p.work {
61+
select {
62+
case <-p.ctx.Done():
63+
return p.ctx.Err()
64+
default:
65+
}
66+
p.prefetchBranches(trieCtx, hashedKey)
67+
}
68+
return nil
69+
})
70+
}
71+
}
72+
73+
// prefetchBranches walks nibble prefixes of hashedKey and loads branch nodes
74+
// into the persistent cache. Stops at first missing branch (leaf zone).
75+
func (p *BranchPrefetcher) prefetchBranches(trieCtx PatriciaContext, hashedKey []byte) {
76+
for depth := 1; depth <= len(hashedKey) && depth <= p.maxDepth; depth++ {
77+
prefix := HexNibblesToCompactBytes(hashedKey[:depth])
78+
79+
// Already in cache — skip DB read
80+
if p.cache.Contains(prefix) {
81+
continue
82+
}
83+
84+
branchData, _, err := trieCtx.Branch(prefix)
85+
if err != nil || len(branchData) < 4 {
86+
break // no branch at this depth, stop walking
87+
}
88+
89+
p.cache.Put(prefix, branchData)
90+
p.prefetched.Add(1)
91+
}
92+
}
93+
94+
// Submit enqueues a hashed key for branch prefetching. Non-blocking; drops if full.
95+
func (p *BranchPrefetcher) Submit(hashedKey []byte) {
96+
if !p.started.Load() || p.closed.Load() {
97+
return
98+
}
99+
// Make a copy since caller may reuse the buffer
100+
key := make([]byte, len(hashedKey))
101+
copy(key, hashedKey)
102+
select {
103+
case p.work <- key:
104+
default: // drop if channel full — prefetching is best-effort
105+
}
106+
}
107+
108+
// Prefetched returns the number of branch nodes prefetched into cache.
109+
func (p *BranchPrefetcher) Prefetched() uint64 {
110+
return p.prefetched.Load()
111+
}
112+
113+
// Stop drains pending work, waits for workers to finish, and releases resources.
114+
func (p *BranchPrefetcher) Stop() {
115+
if p.closed.Swap(true) {
116+
return
117+
}
118+
if p.work != nil {
119+
close(p.work)
120+
}
121+
if p.g != nil {
122+
p.g.Wait()
123+
}
124+
p.cancel()
125+
}

execution/commitment/commitment.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,10 @@ type Updates struct {
15351535

15361536
batchSlab []KeyUpdate // grow-only slab for HashSort batch (avoids per-key heap allocs)
15371537
byteArena []byte // grow-only byte arena for HashSort key copies
1538+
1539+
// onHashedKey is called with the hashed key after TouchPlainKey computes it.
1540+
// Used by the branch prefetcher to pre-warm the persistent cache.
1541+
onHashedKey func(hashedKey []byte)
15381542
}
15391543

15401544
// arenaAlloc appends b to the byte arena and returns the sub-slice.
@@ -1636,6 +1640,9 @@ func (t *Updates) initCollector() {
16361640

16371641
func (t *Updates) Mode() Mode { return t.mode }
16381642

1643+
// SetOnHashedKey sets a callback invoked with each hashed key during TouchPlainKey.
1644+
func (t *Updates) SetOnHashedKey(fn func(hashedKey []byte)) { t.onHashedKey = fn }
1645+
16391646
func (t *Updates) Size() (updates uint64) {
16401647
switch t.mode {
16411648
case ModeDirect:
@@ -1671,6 +1678,10 @@ func (t *Updates) TouchPlainKey(key string, val []byte, fn func(c *KeyUpdate, va
16711678
keyBytes := common.ToBytesZeroCopy(key)
16721679
hashedKey := t.hasher(keyBytes)
16731680

1681+
if t.onHashedKey != nil {
1682+
t.onHashedKey(hashedKey)
1683+
}
1684+
16741685
var err error
16751686
if !t.sortPerNibble {
16761687
err = t.etl.Collect(hashedKey, keyBytes)

execution/commitment/commitmentdb/commitment_context.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ type SharedDomainsCommitmentContext struct {
5757
deferCommitmentUpdates bool
5858
// pendingUpdate stores a single deferred branch update to be flushed at the next ComputeCommitment call.
5959
pendingUpdate *commitment.PendingCommitmentUpdate
60+
61+
// branchPrefetcher pre-fetches branch nodes into the persistent cache at TouchKey time
62+
branchPrefetcher *commitment.BranchPrefetcher
6063
}
6164

6265
// SetStateReader can be used to set a custom state reader (otherwise the default one is set in SharedDomainsCommitmentContext.trieContext).
@@ -75,6 +78,39 @@ func (sdc *SharedDomainsCommitmentContext) EnableParaTrieDB(db kv.TemporalRoDB)
7578
sdc.paraTrieDB = db
7679
}
7780

81+
// StartBranchPrefetcher starts a background prefetcher that populates the persistent
82+
// BranchCache at TouchKey time, well before Process/HashSort needs the data.
83+
// Requires EnableParaTrieDB to have been called first.
84+
func (sdc *SharedDomainsCommitmentContext) StartBranchPrefetcher(ctx context.Context) {
85+
if sdc.paraTrieDB == nil {
86+
return
87+
}
88+
branchCache := sdc.patriciaTrie.GetBranchCache()
89+
if branchCache == nil {
90+
return
91+
}
92+
if sdc.branchPrefetcher != nil {
93+
sdc.branchPrefetcher.Stop()
94+
}
95+
96+
txNum := sdc.sharedDomains.TxNum()
97+
factory := sdc.trieContextFactory(ctx, sdc.paraTrieDB, txNum)
98+
sdc.branchPrefetcher = commitment.NewBranchPrefetcher(branchCache, factory, 4, commitment.WarmupMaxDepth)
99+
sdc.branchPrefetcher.Start()
100+
101+
// Wire the callback so TouchPlainKey feeds hashed keys to the prefetcher
102+
sdc.updates.SetOnHashedKey(sdc.branchPrefetcher.Submit)
103+
}
104+
105+
// StopBranchPrefetcher stops the background prefetcher and disconnects the callback.
106+
func (sdc *SharedDomainsCommitmentContext) StopBranchPrefetcher() {
107+
if sdc.branchPrefetcher != nil {
108+
sdc.updates.SetOnHashedKey(nil)
109+
sdc.branchPrefetcher.Stop()
110+
sdc.branchPrefetcher = nil
111+
}
112+
}
113+
78114
func (sdc *SharedDomainsCommitmentContext) SetDeferBranchUpdates(deferBranchUpdates bool) {
79115
if sdc.patriciaTrie.Variant() == commitment.VariantHexPatriciaTrie {
80116
sdc.patriciaTrie.(*commitment.HexPatriciaHashed).SetDeferBranchUpdates(deferBranchUpdates)
@@ -180,6 +216,7 @@ func (sdc *SharedDomainsCommitmentContext) trieContext(tx kv.TemporalTx, txNum u
180216
}
181217

182218
func (sdc *SharedDomainsCommitmentContext) Close() {
219+
sdc.StopBranchPrefetcher()
183220
sdc.updates.Close()
184221
sdc.patriciaTrie.Release()
185222
}
@@ -265,6 +302,9 @@ func (sdc *SharedDomainsCommitmentContext) ComputeCommitment(ctx context.Context
265302
}
266303
}
267304

305+
// Stop background prefetcher before Process to avoid DB contention
306+
sdc.StopBranchPrefetcher()
307+
268308
trieContext := sdc.trieContext(tx, txNum)
269309

270310
var warmupConfig commitment.WarmupConfig

execution/stagedsync/exec3.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func ExecV3(ctx context.Context,
201201
doms.EnableTrieWarmup(true)
202202
// Do it only for chain-tip blocks!
203203
doms.EnableWarmupCache(!isApplyingBlocks)
204+
doms.StartBranchPrefetcher(ctx)
204205
postValidator := newBlockPostExecutionValidator()
205206
doms.SetDeferCommitmentUpdates(false)
206207
if !isApplyingBlocks {

0 commit comments

Comments
 (0)