Skip to content

Commit f4ca01d

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 18ae982 + a540611 commit f4ca01d

6 files changed

Lines changed: 299 additions & 70 deletions

File tree

configs/coins/zcash.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
"package_name": "backend-zcash",
2323
"package_revision": "satoshilabs-1",
2424
"system_user": "zcash",
25-
"version": "4.4.1",
26-
"docker_image": "zfnd/zebra:4.4.1",
25+
"version": "4.5.0",
26+
"docker_image": "zfnd/zebra:4.5.0",
2727
"verification_type": "docker",
28-
"verification_source": "96149af0257d1f52612544b68f160f8c1bd1d229a47aced203bfa35f4925137d",
28+
"verification_source": "9fd0125f01a04f3ebaec3d0c6426ee4fa48c7a587e62a12d1ac79c3d5fc357e7",
2929
"extract_command": "mkdir backend/bin && docker cp extract:/usr/local/bin/zebrad backend/bin/zebrad",
3030
"exclude_files": [],
3131
"exec_command_template": "{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/bin/zebrad --config {{.Env.BackendInstallPath}}/{{.Coin.Alias}}/zcash.conf start",

db/address_hotness.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ type addressHotness struct {
3232
minHits int
3333
lru *hotAddressLRU
3434
onEvict func(addressHotnessKey)
35-
// hits tracks per-block lookup counts so we can decide when an address is hot.
36-
// It is cleared at BeginBlock to avoid unbounded growth.
35+
// hits tracks lookup counts so we can decide when an address is hot. Counts
36+
// accumulate across blocks so an address that recurs over several blocks (not
37+
// only within one busy block) can become hot; the map is reset in BeginBlock
38+
// once it grows past maxPendingHits to keep memory bounded.
3739
hits map[addressHotnessKey]uint16
40+
// maxPendingHits bounds the hits map (set to the LRU size, the natural ceiling
41+
// for promotion candidates).
42+
maxPendingHits int
3843
// block stats (reset after reporting) to keep logging cheap.
3944
// blockEligibleLookups counts lookups with contractCount >= minContracts (i.e., eligible for hotness).
4045
blockEligibleLookups uint64
@@ -51,11 +56,11 @@ func newAddressHotness(minContracts, lruSize, minHits int) *addressHotness {
5156
return nil
5257
}
5358
return &addressHotness{
54-
minContracts: minContracts,
55-
minHits: minHits,
56-
lru: newHotAddressLRU(lruSize),
57-
// Pre-size the per-block hit map to avoid reallocs on busy blocks.
58-
hits: make(map[addressHotnessKey]uint16),
59+
minContracts: minContracts,
60+
minHits: minHits,
61+
lru: newHotAddressLRU(lruSize),
62+
maxPendingHits: lruSize,
63+
hits: make(map[addressHotnessKey]uint16),
5964
}
6065
}
6166

@@ -72,9 +77,20 @@ func (h *addressHotness) BeginBlock() {
7277
if h == nil {
7378
return
7479
}
75-
// Reset per-block hit counts; LRU survives across blocks.
76-
clear(h.hits)
77-
// Reset per-block stats counters.
80+
// Hit counts accumulate across blocks so addresses looked up repeatedly over
81+
// several blocks (not only within one busy block) can become hot — this lets
82+
// the index help lower-throughput chains, not just very busy ones. Reset only
83+
// when the candidate map grows past its bound; dropping pending counts merely
84+
// delays a promotion and never affects correctness (lookups fall back to a
85+
// linear scan when the index is not used). The LRU survives across blocks.
86+
if len(h.hits) > h.maxPendingHits {
87+
// Reinitialize rather than clear(): Go's clear() does not shrink a map's
88+
// underlying bucket allocation, so a single oversized block would leave the
89+
// allocation at its high-water mark. Pre-size to maxPendingHits, the
90+
// steady-state ceiling, so the oversized buckets can be released.
91+
h.hits = make(map[addressHotnessKey]uint16, h.maxPendingHits)
92+
}
93+
// Reset per-block stats counters (metrics report per-block deltas).
7894
h.blockEligibleLookups = 0
7995
h.blockLRUHits = 0
8096
h.blockPromotions = 0

db/address_hotness_test.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func Test_addressHotness_LRUEvictionHook(t *testing.T) {
122122
}
123123

124124
func Test_addressHotness_Specs(t *testing.T) {
125-
t.Run("it should reset per-block hits", func(t *testing.T) {
125+
t.Run("it should accumulate hits across blocks", func(t *testing.T) {
126126
hot := newAddressHotness(1, 2, 2)
127127
if hot == nil {
128128
t.Fatal("expected hotness tracker to be initialized")
@@ -133,8 +133,30 @@ func Test_addressHotness_Specs(t *testing.T) {
133133
t.Fatal("expected first hit to stay cold")
134134
}
135135
hot.BeginBlock()
136-
if hot.ShouldUseIndex(key, 1) {
137-
t.Fatal("expected hit count to reset between blocks")
136+
if !hot.ShouldUseIndex(key, 1) {
137+
t.Fatal("expected hit counts to accumulate across blocks and promote")
138+
}
139+
})
140+
141+
t.Run("it should reset pending hits once the candidate map exceeds its bound", func(t *testing.T) {
142+
// lruSize (and thus maxPendingHits) is 1 here.
143+
hot := newAddressHotness(1, 1, 2)
144+
if hot == nil {
145+
t.Fatal("expected hotness tracker to be initialized")
146+
}
147+
a := makeHotKey(30)
148+
b := makeHotKey(31)
149+
hot.BeginBlock()
150+
if hot.ShouldUseIndex(a, 1) {
151+
t.Fatal("expected first hit on A to stay cold")
152+
}
153+
if hot.ShouldUseIndex(b, 1) {
154+
t.Fatal("expected first hit on B to stay cold")
155+
}
156+
// hits now holds 2 pending entries > maxPendingHits (1), so BeginBlock clears it.
157+
hot.BeginBlock()
158+
if hot.ShouldUseIndex(a, 1) {
159+
t.Fatal("expected A's pending hit to be cleared once the bound was exceeded")
138160
}
139161
})
140162

db/sync.go

Lines changed: 94 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ type MissingBlockRetryConfig struct {
4343
TipRecheckThreshold int
4444
// RetryDelay keeps retry pressure low while still reacting quickly to transient backend gaps.
4545
RetryDelay time.Duration
46-
// MaxStallDuration caps the wall-clock time a single block fetch may spend
47-
// in the retry loop before yielding control to the outer resync machinery.
48-
// Without this cap, a backend that keeps returning ErrBlockNotFound while
49-
// shouldRestartSyncOnMissingBlock keeps reporting "no reorg" loops forever.
46+
// MaxStallDuration caps the wall-clock time a single block fetch may spend in
47+
// the retry loop before yielding errResync. Liveness invariant: since lagging
48+
// probes report "no reorg" and known hashes get retried, a genuinely-behind
49+
// backend or chain-shortening reorg relies on this cap. Must stay > 0
50+
// (ApplyMissingBlockRetryOverride enforces it).
5051
MaxStallDuration time.Duration
5152
}
5253

@@ -118,6 +119,14 @@ func NewSyncWorkerWithConfig(db *RocksDB, chain bchain.BlockChain, syncWorkers,
118119
if cfg != nil {
119120
effectiveCfg = *cfg
120121
}
122+
// MaxStallDuration is the load-bearing liveness cap (see its doc): the retry
123+
// loops disable the cap when it's <= 0, which would let a chain-shortening
124+
// reorg spin forever. Enforce the invariant structurally here so every caller
125+
// (including tests passing a partial cfg) gets a safe value, not just the
126+
// ApplyMissingBlockRetryOverride path.
127+
if effectiveCfg.MissingBlockRetry.MaxStallDuration <= 0 {
128+
effectiveCfg.MissingBlockRetry.MaxStallDuration = DefaultMissingBlockRetryConfig().MaxStallDuration
129+
}
121130
return &SyncWorker{
122131
db: db,
123132
chain: chain,
@@ -255,44 +264,44 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
255264
return err
256265
}
257266
if remoteBestHeight < w.startHeight {
258-
glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight)
259-
return errors.New("resync: remote best height error")
260-
}
261-
if initialSync {
262-
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
263-
glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
264-
// Bulk sync can encounter a disappearing block hash during reorgs.
265-
// When that happens, it returns errResync to trigger a full restart.
266-
err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight)
267-
if err != nil {
268-
if stdErrors.Is(err, errResync) {
269-
// block hash changed during parallel sync, restart the full resync
270-
return w.resyncIndex(onNewBlock, initialSync)
267+
glog.Warning("resync: observed remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight, ", falling back to sequential sync")
268+
} else {
269+
if initialSync {
270+
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
271+
glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
272+
// Bulk sync can encounter a disappearing block hash during reorgs.
273+
// When that happens, it returns errResync to trigger a full restart.
274+
err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight)
275+
if err != nil {
276+
if stdErrors.Is(err, errResync) {
277+
// block hash changed during parallel sync, restart the full resync
278+
return w.resyncIndex(onNewBlock, initialSync)
279+
}
280+
return err
271281
}
272-
return err
282+
// after parallel load finish the sync using standard way,
283+
// new blocks may have been created in the meantime
284+
return w.resyncIndex(onNewBlock, initialSync)
273285
}
274-
// after parallel load finish the sync using standard way,
275-
// new blocks may have been created in the meantime
276-
return w.resyncIndex(onNewBlock, initialSync)
277286
}
278-
}
279-
if w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType {
280-
syncWorkers := uint32(4)
281-
if remoteBestHeight-w.startHeight >= syncWorkers {
282-
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers)
283-
// Parallel sync also returns errResync when a requested hash no longer
284-
// exists at its height; restart to realign with the canonical chain.
285-
err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers)
286-
if err != nil {
287-
if stdErrors.Is(err, errResync) {
288-
// block hash changed during parallel sync, restart the full resync
289-
return w.resyncIndex(onNewBlock, initialSync)
287+
if w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType {
288+
syncWorkers := uint32(4)
289+
if remoteBestHeight-w.startHeight >= syncWorkers {
290+
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers)
291+
// Parallel sync also returns errResync when a requested hash no longer
292+
// exists at its height; restart to realign with the canonical chain.
293+
err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers)
294+
if err != nil {
295+
if stdErrors.Is(err, errResync) {
296+
// block hash changed during parallel sync, restart the full resync
297+
return w.resyncIndex(onNewBlock, initialSync)
298+
}
299+
return err
290300
}
291-
return err
301+
// after parallel load finish the sync using standard way,
302+
// new blocks may have been created in the meantime
303+
return w.resyncIndex(onNewBlock, initialSync)
292304
}
293-
// after parallel load finish the sync using standard way,
294-
// new blocks may have been created in the meantime
295-
return w.resyncIndex(onNewBlock, initialSync)
296305
}
297306
}
298307
}
@@ -400,21 +409,37 @@ type hashHeight struct {
400409
height uint32
401410
}
402411

412+
// sendHashHeight queues hh but stays abort-aware: if a full hch made this a blocking
413+
// send, the coordinator could never read abortCh and sync would wedge. On abort hh is
414+
// intentionally dropped since the round is being torn down anyway.
415+
func (w *SyncWorker) sendHashHeight(hch chan<- hashHeight, abortCh <-chan error, hh hashHeight) error {
416+
select {
417+
case hch <- hh:
418+
return nil
419+
case abortErr := <-abortCh:
420+
return abortErr
421+
case <-w.chanOsSignal:
422+
return ErrOperationInterrupted
423+
}
424+
}
425+
403426
func (w *SyncWorker) shouldRestartSyncOnMissingBlock(height uint32, expectedHash string) (bool, error) {
404-
// When a block hash disappears at a given height, it usually indicates a
405-
// reorg/rollback. Confirm by checking the current tip and block hash.
427+
// When a block hash disappears at a given height, it can indicate a
428+
// reorg/rollback, but on load-balanced EVM RPCs a single lagging backend can
429+
// also report an older tip. Only restart immediately when another probe can
430+
// prove the height exists with a different hash; otherwise let the retry
431+
// loop or wall-clock cap yield control to the outer resync.
406432
bestHeight, err := w.chain.GetBestBlockHeight()
407433
if err != nil {
408434
return false, err
409435
}
410436
if bestHeight < height {
411-
// The tip moved below the requested height, so this block is no longer valid.
412-
return true, nil
437+
return false, nil
413438
}
414439
currentHash, err := w.chain.GetBlockHash(height)
415440
if err != nil {
416441
if stdErrors.Is(err, bchain.ErrBlockNotFound) {
417-
return true, nil
442+
return false, nil
418443
}
419444
return false, err
420445
}
@@ -574,7 +599,17 @@ ConnectLoop:
574599
time.Sleep(time.Millisecond * 500)
575600
continue
576601
}
577-
hch <- hashHeight{hash, h}
602+
if err = w.sendHashHeight(hch, abortCh, hashHeight{hash, h}); err != nil {
603+
if stdErrors.Is(err, errResync) {
604+
glog.Warning("sync: parallel connect aborted while queueing block hash, restarting sync")
605+
} else if stdErrors.Is(err, ErrOperationInterrupted) {
606+
glog.Info("connectBlocksParallel interrupted at height ", h)
607+
} else {
608+
glog.Error("sync: parallel connect aborted while queueing block hash, worker error ", err)
609+
}
610+
close(terminating)
611+
break ConnectLoop
612+
}
578613
h++
579614
}
580615
}
@@ -791,7 +826,7 @@ ConnectLoop:
791826
close(terminating)
792827
break ConnectLoop
793828
case <-w.chanOsSignal:
794-
glog.Info("connectBlocksParallel interrupted at height ", h)
829+
glog.Info("BulkConnectBlocks interrupted at height ", h)
795830
err = ErrOperationInterrupted
796831
// signal all workers to terminate their loops (error loops are interrupted below)
797832
close(terminating)
@@ -804,7 +839,17 @@ ConnectLoop:
804839
time.Sleep(time.Millisecond * 500)
805840
continue
806841
}
807-
hch <- hashHeight{hash, h}
842+
if err = w.sendHashHeight(hch, abortCh, hashHeight{hash, h}); err != nil {
843+
if stdErrors.Is(err, errResync) {
844+
glog.Warning("sync: bulk connect aborted while queueing block hash, restarting sync")
845+
} else if stdErrors.Is(err, ErrOperationInterrupted) {
846+
glog.Info("BulkConnectBlocks interrupted at height ", h)
847+
} else {
848+
glog.Error("sync: bulk connect aborted while queueing block hash, worker error ", err)
849+
}
850+
close(terminating)
851+
break ConnectLoop
852+
}
808853
if h > 0 && h%1000 == 0 {
809854
w.metrics.BlockbookBestHeight.Set(float64(h))
810855
glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start), " ", w.db.GetAndResetConnectBlockStats())
@@ -891,7 +936,10 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) {
891936
return
892937
}
893938
if height > bestHeight {
894-
return
939+
if hash == "" {
940+
return
941+
}
942+
glog.Warningf("getBlockChain: block %d %s is above observed backend height %d; retrying because the block hash was already observed", height, hash, bestHeight)
895943
}
896944
}
897945
if gotNotFound {

0 commit comments

Comments
 (0)