Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ const (
// Size of cache for recent chainsync cursors
cursorCacheSize = 20

blockBatchSize = 500
// blockBatchSize controls how many blocks are requested per GetBlockRange
// call during NtN catch-up sync. Must be small enough that the events
// generated (~20 per block) fit in the eventChan buffer to avoid
// backpressure that stalls the blockfetch recv queue and triggers
// "message queue limit exceeded" errors.
blockBatchSize = 50

maxAutoReconnectDelay = 60 * time.Second
defaultKupoTimeout = 30 * time.Second
Expand Down Expand Up @@ -155,7 +160,10 @@ func (c *ChainSync) Start() error {
// remain valid. Reusing them preserves pipeline goroutine references
// that would otherwise be orphaned by a channel swap.
if c.eventChan == nil {
c.eventChan = make(chan event.Event, 10)
// Buffer must be large enough to absorb bursts during catch-up sync.
// With PipelineLimit=50 and ~20 events/block, we can see 1000+
// events queued before the output callback drains them.
c.eventChan = make(chan event.Event, 2048)
}
if c.errorChan == nil {
c.errorChan = make(chan error)
Expand Down Expand Up @@ -274,6 +282,9 @@ func (c *ChainSync) setupConnection() error {
ochainsync.WithRollBackwardFunc(c.handleRollBackward),
// Enable pipelining of RequestNext messages to speed up chainsync
ochainsync.WithPipelineLimit(50),
// Recv queue must exceed pipeline limit to avoid "message queue
// limit exceeded" errors during rapid catch-up sync
ochainsync.WithRecvQueueSize(100),
),
),
ouroboros.WithBlockFetchConfig(
Expand Down