Skip to content
192 changes: 97 additions & 95 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,118 +689,120 @@ func (r *ConcurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
errBackoff := backoff.New(ctx, r.fetchBackoffConfig)
Copy link
Contributor Author

@seizethedave seizethedave Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should every want get a fresh backoff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would different wants get different behaviour from the kafka brokers? i don't think so. if they start getting errors, they slow down faster if they use a single backoff


for w := range wants {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

// This current buffered fetchResult that has not been sent to the result channel yet.
// This is empty at the beginning, then we merge records as soon as we receive them
// from the Fetch response(s).
var bufferedResult fetchResult

for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ {
attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

// Run a single Fetch request.
if res := r.fetchSingle(ctx, w); res.Err != nil {
// We got an error. We handle it and then discard this fetch result content.
var err error
w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan)
if err != nil {
// An error returned from handleKafkaFetchErr is a fatal
// error. We produce an error result and abort further
// processing.

level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err)

select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case <-ctx.Done():
case w.result <- newErrorFetchResult(ctx, r.partitionID, err):
}

// Break out of the retry loop, but continue servicing the `wants` channel.
break
}
} else {
// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(res.Records)))
r.handleWant(ctx, logger, w, errBackoff, highWatermark)
}
}

// Update the high watermark.
if hwm := res.HighWatermark; hwm >= 0 {
casHWM(highWatermark, hwm)
}
// handleWant handles a single fetchWant. It performs Fetch requests until the fetchWant is complete or the context is cancelled.
func (r *ConcurrentFetchers) handleWant(ctx context.Context, logger log.Logger, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64) {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

// Merge the last fetch result if the previous buffered result (if any).
// Keep non-mergeable fields from res, because the last response is the most updated one.
bufferedResult = res.Merge(bufferedResult)
}
defer close(w.result)
defer wantSpan.Finish()

if len(bufferedResult.Records) == 0 {
// If we have no buffered records to try to send to the result channel then we retry with another
// Fetch attempt. However, before doing it, we check if we've been told to stop (if so, we should honor it).
attemptSpan.Finish()

select {
case <-r.done:
wantSpan.Finish()
close(w.result)
return
default:
}
// This current buffered fetchResult that has not been sent to the result channel yet.
// This is empty at the beginning, then we merge records as soon as we receive them
// from the Fetch response(s).
var bufferedResult fetchResult
continueAttempts := true

continue
for attempt := 0; errBackoff.Ongoing() && continueAttempts && !r.isDone() && w.endOffset > w.startOffset; attempt++ {
w, bufferedResult, continueAttempts = r.performAttempt(ctx, logger, attempt, w, errBackoff, highWatermark, bufferedResult)
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop may exit without sending any buffered records to the result channel. If the loop condition becomes false (e.g., errBackoff.Ongoing() returns false, isDone() returns true, or w.endOffset <= w.startOffset), any records in bufferedResult will be discarded instead of being sent to w.result. In the original code, bufferedResult was always sent to the channel before checking these conditions again. Consider sending any remaining bufferedResult before the handleWant function exits.

Suggested change
}
}
// Send any remaining bufferedResult before exiting.
if len(bufferedResult.Records) > 0 {
w.result <- bufferedResult
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, interesting, I'll think about that.

}

// performAttempt performs a single Fetch request and handles the result. It
// returns the potentially modified fetchWant, the potentially modified buffered
// result, and a boolean indicating whether to proceed with the next attempt.
func (r *ConcurrentFetchers) performAttempt(ctx context.Context, logger log.Logger, attempt int, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64, bufferedResult fetchResult) (fetchWant, fetchResult, bool) {
attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

defer attemptSpan.Finish()

// Run a single Fetch request.
if res := r.fetchSingle(ctx, w); res.Err != nil {
// We got an error. We handle it and then discard this fetch result content.
var err error
w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan)
if err != nil {
// An error returned from handleKafkaFetchErr is a fatal
// error. We produce an error result and abort further
// processing.

level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err)

select {
case <-r.done:
case <-ctx.Done():
case w.result <- newErrorFetchResult(ctx, r.partitionID, err):
}

// Next attempt will be from the last record onwards.
w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1
w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt.
return fetchWant{}, fetchResult{}, false
}
} else {
// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(res.Records)))

// Update the high watermark.
if hwm := res.HighWatermark; hwm >= 0 {
casHWM(highWatermark, hwm)
}

// Merge the last fetch result with the previous buffered result (if any).
// Keep non-mergeable fields from res, because the last response is the most updated one.
bufferedResult = res.Merge(bufferedResult)
}

// We reset the backoff if we received any records whatsoever. A received record means _some_ success.
// We don't want to slow down until we hit a larger error.
errBackoff.Reset()
if len(bufferedResult.Records) == 0 {
// If we have no buffered records to try to send to the result channel
// then we retry with another Fetch attempt.
return w, bufferedResult, true
}

// Otherwise, there was at least one record fetched. We update the fetchWant and the buffered result.

w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1
w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt.

// We reset the backoff if we received any records whatsoever.
errBackoff.Reset()

select {
case <-r.done:
case <-ctx.Done():
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
default:
if w.startOffset >= w.endOffset {
// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
bufferedResult.startWaitingForConsumption()
select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case <-ctx.Done():
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
case <-ctx.Done():
default:
if w.startOffset >= w.endOffset {
// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
bufferedResult.startWaitingForConsumption()
select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
case <-ctx.Done():
}
}
}
attemptSpan.Finish()
}
wantSpan.Finish()
close(w.result)
}
return w, bufferedResult, true
}

func (r *ConcurrentFetchers) isDone() bool {
select {
case <-r.done:
return true
default:
return false
}
}

func casHWM(highWwatermark *atomic.Int64, newHWM int64) {
for hwm := highWwatermark.Load(); hwm < newHWM; hwm = highWwatermark.Load() {
if highWwatermark.CompareAndSwap(hwm, newHWM) {
func casHWM(highWatermark *atomic.Int64, newHWM int64) {
for hwm := highWatermark.Load(); hwm < newHWM; hwm = highWatermark.Load() {
if highWatermark.CompareAndSwap(hwm, newHWM) {
break
}
}
Expand Down
Loading