Skip to content

Commit 5f8bd2f

Browse files
authored
fix(sync): resume adder pipeline from last DB block instead of tip (#114)
The adder pipeline used WithIntersectTip(true) on every restart, causing it to skip ahead to the current chain tip and permanently lose any blocks between the dead connection and the new tip. Over time these gaps accumulate — epoch 619 was missing 422 blocks, epoch 620 was missing 930 blocks. Missing blocks corrupt the nonce evolution, producing wrong candidate nonces which cascade through TICKN to produce wrong epoch nonces and wrong leaderlog schedules. On restart (after historical sync is done), the pipeline now builds an intersect point from the last 100 blocks in the DB, replaying from that point to fill the gap. Duplicate blocks are handled by the existing ON CONFLICT DO NOTHING in InsertBlock/ProcessBlock. First start still uses tip for live notifications during historical sync.
1 parent 5a24fc8 commit 5f8bd2f

File tree

1 file changed

+37
-1
lines changed

1 file changed

+37
-1
lines changed

main.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
output_embedded "github.com/blinklabs-io/adder/output/embedded"
3030
"github.com/blinklabs-io/adder/pipeline"
3131
"github.com/blinklabs-io/gouroboros/ledger"
32+
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
3233
"github.com/blinklabs-io/gouroboros/ledger/allegra"
3334
"github.com/blinklabs-io/gouroboros/ledger/alonzo"
3435
"github.com/blinklabs-io/gouroboros/ledger/babbage"
@@ -727,13 +728,33 @@ func (i *Indexer) runChainTail() error {
727728
select {}
728729
}
729730

731+
// adderIntersectFromDB builds an intersect point from the last block in the DB,
732+
// backed up by 100 blocks to provide overlap for duplicate handling.
733+
// This ensures the adder pipeline replays any blocks missed during a restart
734+
// instead of skipping ahead to the tip and leaving permanent gaps.
735+
func (i *Indexer) adderIntersectFromDB() (ocommon.Point, error) {
736+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
737+
defer cancel()
738+
blocks, err := i.store.GetLastNBlocks(ctx, 100)
739+
if err != nil || len(blocks) == 0 {
740+
return ocommon.Point{}, fmt.Errorf("no blocks in DB: %w", err)
741+
}
742+
oldest := blocks[len(blocks)-1]
743+
hashBytes, err := hex.DecodeString(oldest.BlockHash)
744+
if err != nil {
745+
return ocommon.Point{}, fmt.Errorf("decoding hash for slot %d: %w", oldest.Slot, err)
746+
}
747+
return ocommon.NewPoint(oldest.Slot, hashBytes), nil
748+
}
749+
730750
// startAdderPipeline starts the adder pipeline for live chain tail.
731751
// It runs an infinite restart loop: on pipeline error or stall, it stops, waits, and reconnects.
732752
// Auto-reconnect is disabled because adder orphans the event channel after reconnect.
733753
// A stall detector goroutine monitors lastBlockTime and forces a restart if no blocks
734754
// arrive for 2 minutes (catches zombie state where pipeline.Stop() previously hung).
735755
func (i *Indexer) startAdderPipeline() error {
736756
hosts := i.nodeAddresses
757+
firstStart := true
737758

738759
for {
739760
connected := false
@@ -746,11 +767,25 @@ func (i *Indexer) startAdderPipeline() error {
746767
inputOpts := []chainsync.ChainSyncOptionFunc{
747768
node,
748769
chainsync.WithNetworkMagic(uint32(i.networkMagic)),
749-
chainsync.WithIntersectTip(true),
750770
chainsync.WithAutoReconnect(false),
751771
chainsync.WithIncludeCbor(false),
752772
}
753773

774+
// First start: intersect at tip for live notifications during historical sync.
775+
// Restarts: intersect from last DB block to fill gaps from the dead connection.
776+
// Without this, every pipeline restart loses blocks between death and new tip.
777+
if !firstStart && atomic.LoadInt32(&i.historicalSyncDone) == 1 {
778+
if pt, err := i.adderIntersectFromDB(); err == nil {
779+
log.Printf("Pipeline resuming from slot %d (last DB block)", pt.Slot)
780+
inputOpts = append(inputOpts, chainsync.WithIntersectPoints([]ocommon.Point{pt}))
781+
} else {
782+
log.Printf("Could not build intersect from DB (%v), falling back to tip", err)
783+
inputOpts = append(inputOpts, chainsync.WithIntersectTip(true))
784+
}
785+
} else {
786+
inputOpts = append(inputOpts, chainsync.WithIntersectTip(true))
787+
}
788+
754789
i.pipeline = pipeline.New()
755790
input_chainsync := chainsync.New(inputOpts...)
756791
i.pipeline.AddInput(input_chainsync)
@@ -778,6 +813,7 @@ func (i *Indexer) startAdderPipeline() error {
778813

779814
log.Printf("Pipeline connected to node at %s", host)
780815
connected = true
816+
firstStart = false
781817
atomic.StoreInt64(&i.lastBlockTime, time.Now().Unix())
782818

783819
// Start stall detector — forces restart if no blocks for 2 minutes.

0 commit comments

Comments
 (0)