-
Notifications
You must be signed in to change notification settings - Fork 684
ingest storage: Refactor ConcurrentFetchers.run
#13809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the ConcurrentFetchers.run() method in the Kafka fetch logic by extracting complex inline logic into separate helper methods for improved code organization and readability.
Key Changes:
- Extracted fetch want handling into
handleWant()method - Extracted individual fetch attempt logic into
performAttempt()method - Added
isDone()helper method for cleaner shutdown checking - Fixed typo in
casHWMfunction parameter name (highWwatermark → highWatermark)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… 2 users, load averages: 3.31 3.61 3.22 USER TTY FROM LOGIN@ IDLE WHAT davidgrant console - Fri10 5days - davidgrant s000 - Fri10 - tmux when retrying.
ConcurrentFetchers.run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if !continueAttempts { | ||
| break | ||
| } | ||
| } |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop may exit without sending any buffered records to the result channel. If the loop condition becomes false (e.g., errBackoff.Ongoing() returns false, isDone() returns true, or w.endOffset <= w.startOffset), any records in bufferedResult will be discarded instead of being sent to w.result. In the original code, bufferedResult was always sent to the channel before checking these conditions again. Consider sending any remaining bufferedResult before the handleWant function exits.
| } | |
| } | |
| // Send any remaining bufferedResult before exiting. | |
| if len(bufferedResult.Records) > 0 { | |
| w.result <- bufferedResult | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, interesting, I'll think about that.
Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ConcurrentFetchers.runConcurrentFetchers.run
| func (r *ConcurrentFetchers) run(ctx context.Context, wants chan fetchWant, logger log.Logger, highWatermark *atomic.Int64) { | ||
| defer r.wg.Done() | ||
|
|
||
| errBackoff := backoff.New(ctx, r.fetchBackoffConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should every want get a fresh backoff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would different wants get different behaviour from the kafka brokers? i don't think so. if they start getting errors, they slow down faster if they use a single backoff
dimitarvdimitrov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, this looks valid
What this PR does
This is a followup from a comment to #12222 from @dimitarvdimitrov:
So this PR makes
run()somewhat less complex by reducing the tangled control flow that was previously going on, moving delicately coded resource freeing to separate methods withdeferblocks, and so on.Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]. If changelog entry is not needed, please add thechangelog-not-neededlabel to the PR.about-versioning.mdupdated with experimental features.Note
Refactors
ConcurrentFetchers.runby extracting fetch handling intohandleWant,performAttempt, andisDone, simplifying control flow and channel/defers; minorcasHWMparam rename.pkg/storage/ingest/fetcher.go):ConcurrentFetchers.run:handleWant().performAttempt()to encapsulate a single fetch attempt (including HWM updates, merge, backoff reset, and result dispatch).isDone()helper for shutdown checks.deferinhandleWanttoclose(w.result)and finish spans; consolidates select/send logic for result delivery and waiting.casHWMtohighWatermarkfor clarity.Written by Cursor Bugbot for commit 127f700. This will update automatically on new commits. Configure here.