-
Notifications
You must be signed in to change notification settings - Fork 1
feat(bitcoinspv/client): non-blocking SendBlocks #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
af857f1
f0ba65d
00d48b5
a0e5cdd
b35b322
066596a
868dc59
02511ad
f4cfda8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -274,6 +274,8 @@ func (r *Relayer) backfillIndexer(ctx context.Context, startHeight, endHeight in | |
| } | ||
| } | ||
| } | ||
|
Comment on lines
248
to
276
|
||
|
|
||
| r.btcIndexer.Flush() | ||
| r.logger.Info().Msg("Indexer backfill completed successfully.") | ||
| return nil | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,61 +19,82 @@ | |
| maxRetries = 4 | ||
| initialBackoff = 500 * time.Millisecond | ||
| maxBackoff = 8 * time.Second | ||
| queueSize = 100 | ||
|
robert-zaremba marked this conversation as resolved.
Outdated
|
||
| ) | ||
|
|
||
| // Client is a client for communicating with the nBTC indexer worker. | ||
| // It wraps the btcindexer API client to add retry logic. | ||
| // It wraps the btcindexer API client to add retry logic and async sending. | ||
| type Client struct { | ||
| logger zerolog.Logger | ||
| apiClient btcindexer.Client | ||
| network string | ||
|
|
||
| blocksChan chan []*types.IndexedBlock | ||
| done chan struct{} | ||
| } | ||
|
Comment on lines
+44
to
76
|
||
|
|
||
| // NewClient creates a new client for the indexer. | ||
| func NewClient(url string, network string, authToken string, parentLogger zerolog.Logger) *Client { | ||
| return &Client{ | ||
| logger: parentLogger.With().Str("module", "btcindexer_client").Logger(), | ||
| apiClient: btcindexer.NewClient(url, authToken), | ||
| network: network, | ||
| c := &Client{ | ||
| logger: parentLogger.With().Str("module", "btcindexer_client").Logger(), | ||
| apiClient: btcindexer.NewClient(url, authToken), | ||
| network: network, | ||
| blocksChan: make(chan []*types.IndexedBlock, queueSize), | ||
| done: make(chan struct{}), | ||
| } | ||
| go c.worker() | ||
| return c | ||
|
robert-zaremba marked this conversation as resolved.
|
||
| } | ||
|
Comment on lines
+90
to
+92
|
||
|
|
||
| // worker processes blocks from the queue in a background goroutine. | ||
| func (c *Client) worker() { | ||
| defer close(c.done) | ||
| for blocks := range c.blocksChan { | ||
| if err := c.sendBlocksWithRetry(blocks); err != nil { | ||
| c.logger.Error().Err(err).Msg("Failed to send blocks to indexer") | ||
| } | ||
| } | ||
| } | ||
|
robert-zaremba marked this conversation as resolved.
|
||
|
|
||
| // SendBlocks sends a batch of blocks to the indexer with a retry mechanism. | ||
| // TODO: this should not block the main process | ||
| // probably we should use CF queues | ||
| func (c *Client) SendBlocks(ctx context.Context, blocks []*types.IndexedBlock) error { | ||
| // SendBlocks enqueues blocks for async sending to the indexer. | ||
| // It returns an error only if the queue is full. | ||
| func (c *Client) SendBlocks(_ context.Context, blocks []*types.IndexedBlock) error { | ||
| if c == nil { | ||
| return errors.New("btcindexer.Client is not initialized") | ||
| } | ||
| if len(blocks) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| select { | ||
| case c.blocksChan <- blocks: | ||
| return nil | ||
| default: | ||
|
robert-zaremba marked this conversation as resolved.
|
||
| return errors.New("indexer queue is full, dropping blocks") | ||
|
robert-zaremba marked this conversation as resolved.
Outdated
|
||
| } | ||
|
robert-zaremba marked this conversation as resolved.
Outdated
Comment on lines
+132
to
+145
|
||
| } | ||
|
|
||
| // sendBlocksWithRetry sends a batch of blocks to the indexer with a retry mechanism. | ||
| func (c *Client) sendBlocksWithRetry(blocks []*types.IndexedBlock) error { | ||
| payload, err := c.preparePayload(blocks) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var lastErr error | ||
| for attempt := 0; attempt <= maxRetries; attempt++ { | ||
| if ctx.Err() != nil { | ||
| return ctx.Err() | ||
| } | ||
|
|
||
| shouldRetry, err := c.sendAndHandleResponse(payload) | ||
| if err != nil { | ||
| // Non-retryable | ||
| return err | ||
| } | ||
|
|
||
| if !shouldRetry { | ||
| // Success | ||
| return nil | ||
| } | ||
|
|
||
| lastErr = fmt.Errorf("attempt %d failed, retrying", attempt+1) | ||
| c.logger.Warn().Err(err).Msg("Retrying indexer call...") | ||
| c.backoff(ctx, attempt) | ||
| c.backoff(attempt) | ||
| } | ||
|
|
||
| return fmt.Errorf("failed to send blocks to indexer after %d attempts: %w", maxRetries+1, lastErr) | ||
|
|
@@ -123,24 +144,19 @@ | |
| return putBlocksReq, nil | ||
| } | ||
|
|
||
| func (c *Client) backoff(ctx context.Context, attempt int) { | ||
| func (c *Client) backoff(attempt int) { | ||
| if attempt >= maxRetries { | ||
| return | ||
| } | ||
| backoff := time.Duration(1<<attempt) * initialBackoff | ||
| if backoff > maxBackoff { | ||
| backoff = maxBackoff | ||
| } | ||
| // NOTE: we dont need secure random generation here, its just retry | ||
| jitter := time.Duration(rand.Intn(1000)) * time.Millisecond //nolint:gosec | ||
| totalBackoff := backoff + jitter | ||
|
|
||
| c.logger.Info().Dur("wait_duration", totalBackoff).Msgf("Waiting before next attempt..") | ||
|
|
||
| select { | ||
| case <-time.After(totalBackoff): | ||
| case <-ctx.Done(): | ||
| } | ||
| time.Sleep(totalBackoff) | ||
|
robert-zaremba marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| // GetLatestHeight returns the latest block height known to the indexer | ||
|
|
@@ -153,3 +169,16 @@ | |
|
|
||
| return height, nil | ||
| } | ||
|
|
||
| // Close stops the background worker and waits for it to finish. | ||
| func (c *Client) Close() { | ||
|
robert-zaremba marked this conversation as resolved.
|
||
| close(c.blocksChan) | ||
| <-c.done | ||
|
sourcery-ai[bot] marked this conversation as resolved.
Outdated
|
||
| } | ||
|
robert-zaremba marked this conversation as resolved.
|
||
|
|
||
| // Flush waits for all queued blocks to be sent. | ||
| func (c *Client) Flush() { | ||
| for len(c.blocksChan) > 0 { | ||
|
sourcery-ai[bot] marked this conversation as resolved.
Outdated
|
||
| time.Sleep(100 * time.Millisecond) | ||
| } | ||
| } | ||
|
robert-zaremba marked this conversation as resolved.
Outdated
robert-zaremba marked this conversation as resolved.
Outdated
Comment on lines
+285
to
+295
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.