feat(bitcoinspv/client): non-blocking SendBlocks#357
Conversation
- Blocks are queued to a buffered channel (size 100) - A background worker processes the queue with retries - Flush() waits for queued blocks to be sent (used in bootstrap) - Close() gracefully shuts down the worker (called on relayer shutdown) - Errors are logged asynchronously rather than returned Signed-off-by: Robert Zaremba <robert@zaremba.ch>
Reviewer's GuideImplements asynchronous, buffered sending of blocks to the btcindexer with a background worker, adds lifecycle hooks (Flush/Close) to ensure graceful shutdown and backfill completion, and updates the indexer interface and mocks accordingly. Sequence diagram for asynchronous SendBlocks flowsequenceDiagram
actor Relayer
participant RelayerBackfill as Relayer_backfillIndexer
participant BTCIndexer as BTCIndexerClient
participant Worker as Client_worker
participant BTCIndexerAPI as BTCIndexerAPI
Relayer ->> RelayerBackfill: backfillIndexer(ctx, startHeight, endHeight)
loop per_batch_of_blocks
RelayerBackfill ->> BTCIndexer: SendBlocks(ctx, blocks)
alt client_not_initialized
BTCIndexer -->> RelayerBackfill: error
else empty_block_slice
BTCIndexer -->> RelayerBackfill: nil
else queue_has_capacity
BTCIndexer ->> BTCIndexer: enqueue blocks to blocksChan
BTCIndexer -->> RelayerBackfill: nil (return immediately)
Worker ->> Worker: receive blocks from blocksChan
Worker ->> Worker: sendBlocksWithRetry(blocks)
loop retries_up_to_maxRetries
Worker ->> BTCIndexerAPI: PutBlocks(payload)
alt non_retryable_or_success
BTCIndexerAPI -->> Worker: success or non_retryable_error
Worker -->> Worker: return from sendBlocksWithRetry
Worker ->> Worker: break
else retryable_error
BTCIndexerAPI -->> Worker: retryable_error
Worker ->> Worker: backoff(attempt)
end
end
else queue_full
BTCIndexer -->> RelayerBackfill: error "indexer queue is full, dropping blocks"
end
end
RelayerBackfill ->> BTCIndexer: Flush()
loop until_blocksChan_empty
BTCIndexer ->> BTCIndexer: poll len(blocksChan)
end
BTCIndexer -->> RelayerBackfill: return from Flush
RelayerBackfill -->> Relayer: backfill complete
Sequence diagram for graceful shutdown with ClosesequenceDiagram
actor Operator
participant Relayer
participant BTCIndexer as BTCIndexerClient
participant Worker as Client_worker
Operator ->> Relayer: Stop()
note over Relayer: internal goroutines finish and call Done on WaitGroup
Operator ->> Relayer: WaitForShutdown()
Relayer ->> Relayer: wg.Wait()
Relayer ->> BTCIndexer: Close()
BTCIndexer ->> BTCIndexer: close(blocksChan)
note over Worker: range blocksChan stops when channel is closed
Worker ->> Worker: finish processing remaining queued blocks
Worker ->> BTCIndexer: close(done)
BTCIndexer ->> Relayer: wait on <-done
Relayer -->> Operator: WaitForShutdown returns after worker exits
Updated class diagram for btcindexer client and relayer lifecycleclassDiagram
class Client {
-zerolog.Logger logger
-btcindexer.Client apiClient
-string network
-chan []*types.IndexedBlock blocksChan
-chan struct done
+SendBlocks(ctx context.Context, blocks []*types.IndexedBlock) error
-sendBlocksWithRetry(blocks []*types.IndexedBlock) error
-backoff(attempt int)
+GetLatestHeight() (int64, error)
+Flush()
+Close()
}
class Indexer {
<<interface>>
+SendBlocks(ctx context.Context, blocks []*types.IndexedBlock) error
+GetLatestHeight() (int64, error)
+Flush()
+Close()
}
class Relayer {
-btcindexer.Indexer btcIndexer
-sync.WaitGroup wg
+WaitForShutdown()
+backfillIndexer(ctx context.Context, startHeight int64, endHeight int64) error
}
class MockIndexer {
+Close()
+Flush()
+SendBlocks(ctx context.Context, blocks []*types.IndexedBlock) error
+GetLatestHeight() (int64, error)
}
Client ..|> Indexer
MockIndexer ..|> Indexer
Relayer --> Indexer : uses btcIndexer
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The
Flushimplementation spins onlen(c.blocksChan)and doesn't account for in-flight work already dequeued by the worker; consider a more robust synchronization mechanism (e.g., a WaitGroup or a separate drained signal) to ensure all queued and currently processing batches have completed without busy-waiting. - Because
Closeblindly closesblocksChan, repeated calls (or calling it after a failed constructor path) could panic; it might be safer to guard with async.Onceor an internal closed flag. - The new async
SendBlockssemantics drop blocks when the queue is full and no longer propagate send/retry errors or respect a caller’s context; consider whether callers need a way to observe send failures or honor cancellation, or at least logging when drops occur to aid debugging.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `Flush` implementation spins on `len(c.blocksChan)` and doesn't account for in-flight work already dequeued by the worker; consider a more robust synchronization mechanism (e.g., a WaitGroup or a separate drained signal) to ensure all queued and currently processing batches have completed without busy-waiting.
- Because `Close` blindly closes `blocksChan`, repeated calls (or calling it after a failed constructor path) could panic; it might be safer to guard with a `sync.Once` or an internal closed flag.
- The new async `SendBlocks` semantics drop blocks when the queue is full and no longer propagate send/retry errors or respect a caller’s context; consider whether callers need a way to observe send failures or honor cancellation, or at least logging when drops occur to aid debugging.
## Individual Comments
### Comment 1
<location> `bitcoinspv/clients/btcindexer/client.go:180-181` </location>
<code_context>
+}
+
+// Flush waits for all queued blocks to be sent.
+func (c *Client) Flush() {
+ for len(c.blocksChan) > 0 {
+ time.Sleep(100 * time.Millisecond)
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Flush may return before in-flight sends complete, since it only checks buffer length.
`len(c.blocksChan)` only reflects buffered items, not whether the worker is still processing a batch. After the worker pulls a batch, the length drops to 0 while `sendBlocksWithRetry` may still be running, so `Flush` can return before all sends/retries complete. If `Flush` is meant to wait for all in-flight work, you’ll need explicit coordination with the worker (e.g., a `sync.WaitGroup`, an idle/“no work” signal, or reusing the `done` channel with a dedicated drain-and-wait method).
</issue_to_address>
### Comment 2
<location> `bitcoinspv/clients/btcindexer/client.go:174-176` </location>
<code_context>
}
+
+// Close stops the background worker and waits for it to finish.
+func (c *Client) Close() {
+ close(c.blocksChan)
+ <-c.done
+}
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Close is not safe to call more than once due to closing the channel, which can panic.
Since `Close` is part of an exported interface and is now invoked from `Relayer.WaitForShutdown`, it may be called multiple times or from multiple goroutines, leading to a panic on repeated `close(c.blocksChan)`. Please make `Close` idempotent (e.g., guard with `sync.Once` or an atomic flag) so subsequent calls are safe no-ops.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Pull request overview
This PR refactors the btcindexer client to make SendBlocks non-blocking by introducing a queue-based async worker pattern. The main goal is to prevent blocking the main relayer process when sending blocks to the indexer.
Changes:
- Introduces async block sending with a buffered channel (size 100) and background worker
- Adds
Flush()method to wait for queued blocks to be sent (used during bootstrap) - Adds
Close()method for graceful shutdown (called during relayer shutdown) - Errors from send operations are now logged asynchronously rather than returned
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| bitcoinspv/clients/btcindexer/interface.go | Adds Flush() and Close() methods to Indexer interface |
| bitcoinspv/clients/btcindexer/client.go | Implements async sending with worker goroutine, queue management, and new lifecycle methods |
| bitcoinspv/relayer.go | Calls Close() on btcIndexer during shutdown |
| bitcoinspv/bootstrap.go | Calls Flush() after backfill to ensure all blocks are sent |
| bitcoinspv/clients/mocks/*.go | Regenerated mocks with mockery v2.53.5 to include new methods |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Robert Zaremba <robert@zaremba.ch>
Signed-off-by: Robert Zaremba <robert@zaremba.ch>
1. Close idempotent - Added sync.Once and atomicBool to make Close safe to call multiple times 2. Flush nil check - Added nil and closed checks, now returns error 3. Interruptible backoff - Backoff now checks context cancellation via select statement 4. backfillIndexer error handling - Now checks Flush return value and logs errors properly 5. Context cancellation in SendBlocks - Added check for ctx.Done() before enqueueing 6. SendBlocks after Close guard - Added closed atomic bool check to prevent panic 7. Flush race condition - Added sync.WaitGroup to properly track in-flight operations, and error tracking via atomicError Signed-off-by: Robert Zaremba <robert@zaremba.ch>
Signed-off-by: Robert Zaremba <robert@zaremba.ch>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for current := startHeight; current <= endHeight; current += batchSize { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| default: | ||
| } | ||
|
|
||
| batchEnd := current + batchSize - 1 | ||
| if batchEnd > endHeight { | ||
| batchEnd = endHeight | ||
| } | ||
|
|
||
| r.logger.Info().Int64("from", current).Int64("to", batchEnd).Msg("Fetching block batch for indexer backfill") | ||
|
|
||
| blocksInBatch := make([]*relayertypes.IndexedBlock, 0, batchEnd-current+1) | ||
| for i := current; i <= batchEnd; i++ { | ||
| block, err := r.btcClient.GetBTCBlockByHeight(i) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to fetch block at height %d for indexer backfill: %w", i, err) | ||
| } | ||
| blocksInBatch = append(blocksInBatch, block) | ||
| } | ||
|
|
||
| if len(blocksInBatch) > 0 { | ||
| if err := r.btcIndexer.SendBlocks(ctx, blocksInBatch); err != nil { | ||
| return fmt.Errorf("failed to send block batch to indexer: %w", err) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
In the bootstrap backfill flow, blocks are sent asynchronously via SendBlocks, and then Flush is called to wait for them. However, if the context is cancelled during the backfill loop (lines 250-251), the function returns immediately without calling Flush. This means any blocks already enqueued will continue processing in the background worker even though the backfill operation has been cancelled. Consider calling Flush() or handling context cancellation more gracefully to ensure consistent behavior.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Robert Zaremba <robert@zaremba.ch>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| go c.worker() | ||
| return c | ||
| } |
There was a problem hiding this comment.
The worker goroutine is started in the constructor without any mechanism for the caller to know if it has started successfully. If the goroutine panics immediately during startup (before reaching the defer recover), the client will be in an inconsistent state.
Consider using a sync.WaitGroup or channel to signal when the worker has successfully started, or document this limitation in the function comment.
| func (c *Client) worker() { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| c.logger.Error().Any("panic", r).Msg("Worker panicked, closing done channel") |
There was a problem hiding this comment.
The panic recovery in the worker logs the panic but doesn't propagate it anywhere useful. The client continues to appear functional (Close() will still work), but no more blocks will be processed. Callers have no way to detect that the worker has died from a panic.
Consider adding a mechanism to track worker health, or at least document this behavior. The sendError field could potentially be repurposed to store panic information.
| c.logger.Error().Any("panic", r).Msg("Worker panicked, closing done channel") | |
| // Record the panic as an error so callers can detect worker failure. | |
| err := fmt.Errorf("worker panic: %v", r) | |
| c.logger.Error().Any("panic", r).Err(err).Msg("Worker panicked, closing done channel") | |
| c.sendError.set(err) |
| select { | ||
| case <-done: | ||
| case <-time.After(10 * time.Second): | ||
| c.logger.Warn().Msg("Close timed out waiting for worker to drain") |
There was a problem hiding this comment.
The Close() method has a 10-second timeout for draining the queue, but there's no indication of what happens to the blocks that remain in the queue when the timeout occurs. These blocks will be silently dropped without any error being returned or tracked.
Consider either returning an error from Close() when the timeout occurs, or at minimum logging how many blocks were dropped. The current warning message doesn't indicate data loss severity.
| c.logger.Warn().Msg("Close timed out waiting for worker to drain") | |
| remaining := len(c.blocksChan) | |
| c.logger.Warn(). | |
| Int("queued_blocks_not_processed", remaining). | |
| Msg("Close timed out waiting for worker to drain; remaining queued blocks may be dropped") |
| func (e *atomicError) get() error { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| return e.v | ||
| } |
There was a problem hiding this comment.
The atomicError's get() method reads the error under a mutex lock, which is correct for thread safety. However, the error value itself (which is an interface) is not deep-copied, and error values in Go can contain mutable state. While this is unlikely to cause issues with standard error types, it could theoretically lead to race conditions if custom error types with mutable state are used.
This is a minor concern, but consider documenting that the stored errors should be treated as immutable, or ensure that only immutable error types are stored.
| } | ||
|
|
||
| c.wg.Wait() | ||
| return c.sendError.get() |
There was a problem hiding this comment.
The Flush() method returns the error via sendError.get() without clearing it. This means that if Flush() is called multiple times, the second call will still return the error from the first failed batch, even if all subsequent operations succeeded. This could lead to confusing behavior where an error persists across multiple Flush() calls.
Consider either clearing the error after returning it from Flush(), or documenting that Flush() returns the cumulative error state and should only be called once before creating a new client.
| return c.sendError.get() | |
| err := c.sendError.get() | |
| c.sendError.set(nil) | |
| return err |
| func (c *Client) Flush() error { | ||
| if c == nil { | ||
| return errors.New("btcindexer.Client is not initialized") | ||
| } | ||
| if c.closed.get() { | ||
| return errors.New("btcindexer.Client is closed") | ||
| } | ||
|
|
||
| c.wg.Wait() | ||
| return c.sendError.get() | ||
| } |
There was a problem hiding this comment.
Flush() only tracks the last error that occurred, not all errors. If multiple batches fail to send (e.g., blocks at heights 100-119 and 140-159), only the most recent error will be returned. This could result in incomplete information about what actually failed during a flush operation.
Consider either accumulating errors (e.g., using errors.Join) or maintaining a counter of failed batches along with the error.
| c.wg.Add(1) | ||
| select { | ||
| case c.blocksChan <- blocks: | ||
| return nil | ||
| default: | ||
| c.wg.Done() | ||
| err := errors.New("indexer queue is full, dropping blocks") | ||
| c.logger.Error(). | ||
| Err(err). | ||
| Int("dropped_blocks_count", len(blocks)). | ||
| Int("queue_size", queueSize). | ||
| Msg("Indexer queue is full, dropping blocks") | ||
| return err | ||
| } |
There was a problem hiding this comment.
There's a potential deadlock if Close() is called while SendBlocks is trying to enqueue blocks. The sequence is: SendBlocks calls wg.Add(1), then tries to send to blocksChan. Meanwhile, Close() closes blocksChan and waits on done. If the channel is full and gets closed while SendBlocks is in the select statement, SendBlocks will call wg.Done() but the worker will already have exited, causing the waitgroup count to become inconsistent.
The issue is that wg.Add(1) happens before checking if the client is closed and before attempting to enqueue. Consider moving the wg.Add(1) to after successful enqueueing, or add a more robust shutdown mechanism.
| maxRetries = 4 | ||
| initialBackoff = 500 * time.Millisecond | ||
| maxBackoff = 8 * time.Second | ||
| queueSize = 400 |
There was a problem hiding this comment.
The queueSize constant is set to 400, which seems arbitrarily large and could lead to significant memory usage if blocks are large. The PR description mentions a buffer size of 100, but the code uses 400. This discrepancy suggests either the PR description is outdated or the wrong value was committed.
Consider aligning the actual value with the documented value, and adding a comment explaining the rationale for the chosen queue size.
| case c.blocksChan <- blocks: | ||
| return nil | ||
| default: | ||
| c.wg.Done() | ||
| err := errors.New("indexer queue is full, dropping blocks") | ||
| c.logger.Error(). | ||
| Err(err). | ||
| Int("dropped_blocks_count", len(blocks)). | ||
| Int("queue_size", queueSize). | ||
| Msg("Indexer queue is full, dropping blocks") | ||
| return err |
There was a problem hiding this comment.
When the indexer queue is full, blocks are dropped and an error is logged with detailed information. However, this error is also returned to the caller, which will likely propagate it up the stack. In block_events.go, this error causes the bootstrap process to restart. This seems overly aggressive - if the queue is temporarily full, restarting the entire bootstrap seems like an inappropriate response.
Consider whether returning an error when the queue is full is the right design choice, or if it should be handled more gracefully (e.g., by blocking until space is available, or by logging only).
| case c.blocksChan <- blocks: | |
| return nil | |
| default: | |
| c.wg.Done() | |
| err := errors.New("indexer queue is full, dropping blocks") | |
| c.logger.Error(). | |
| Err(err). | |
| Int("dropped_blocks_count", len(blocks)). | |
| Int("queue_size", queueSize). | |
| Msg("Indexer queue is full, dropping blocks") | |
| return err | |
| case <-ctx.Done(): | |
| c.wg.Done() | |
| return ctx.Err() | |
| case c.blocksChan <- blocks: | |
| return nil |
Summary by Sourcery
Make the btcindexer client send blocks asynchronously via a background worker and integrate graceful shutdown and flushing into the relayer lifecycle.
New Features:
Enhancements: