From 6029d044c14ee069a10da75c20696a1cb715dda1 Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Sat, 28 Feb 2026 10:42:34 -0500 Subject: [PATCH] fix(input): smaller chainsync block batch Signed-off-by: Chris Gianelloni --- input/chainsync/chainsync.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/input/chainsync/chainsync.go b/input/chainsync/chainsync.go index 37627a3..974d486 100644 --- a/input/chainsync/chainsync.go +++ b/input/chainsync/chainsync.go @@ -79,7 +79,12 @@ const ( // Size of cache for recent chainsync cursors cursorCacheSize = 20 - blockBatchSize = 500 + // blockBatchSize controls how many blocks are requested per GetBlockRange + // call during NtN catch-up sync. Must be small enough that the events + // generated (~20 per block) fit in the eventChan buffer to avoid + // backpressure that stalls the blockfetch recv queue and triggers + // "message queue limit exceeded" errors. + blockBatchSize = 50 maxAutoReconnectDelay = 60 * time.Second defaultKupoTimeout = 30 * time.Second @@ -155,7 +160,10 @@ func (c *ChainSync) Start() error { // remain valid. Reusing them preserves pipeline goroutine references // that would otherwise be orphaned by a channel swap. if c.eventChan == nil { - c.eventChan = make(chan event.Event, 10) + // Buffer must be large enough to absorb bursts during catch-up sync. + // With PipelineLimit=50 and ~20 events/block, we can see 1000+ + // events queued before the output callback drains them. + c.eventChan = make(chan event.Event, 2048) } if c.errorChan == nil { c.errorChan = make(chan error) @@ -274,6 +282,9 @@ func (c *ChainSync) setupConnection() error { ochainsync.WithRollBackwardFunc(c.handleRollBackward), // Enable pipelining of RequestNext messages to speed up chainsync ochainsync.WithPipelineLimit(50), + // Recv queue must exceed pipeline limit to avoid "message queue + // limit exceeded" errors during rapid catch-up sync + ochainsync.WithRecvQueueSize(100), ), ), ouroboros.WithBlockFetchConfig(