Skip to content
194 changes: 99 additions & 95 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,118 +689,122 @@ func (r *ConcurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
errBackoff := backoff.New(ctx, r.fetchBackoffConfig)
Copy link
Contributor Author

@seizethedave seizethedave Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should every want get a fresh backoff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would different wants get different behaviour from the kafka brokers? i don't think so. if they start getting errors, they slow down faster if they use a single backoff


for w := range wants {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

// This current buffered fetchResult that has not been sent to the result channel yet.
// This is empty at the beginning, then we merge records as soon as we receive them
// from the Fetch response(s).
var bufferedResult fetchResult

for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ {
attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

// Run a single Fetch request.
if res := r.fetchSingle(ctx, w); res.Err != nil {
// We got an error. We handle it and then discard this fetch result content.
var err error
w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan)
if err != nil {
// An error returned from handleKafkaFetchErr is a fatal
// error. We produce an error result and abort further
// processing.

level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err)

select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case <-ctx.Done():
case w.result <- newErrorFetchResult(ctx, r.partitionID, err):
}

// Break out of the retry loop, but continue servicing the `wants` channel.
break
}
} else {
// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(res.Records)))
r.handleWant(ctx, logger, w, errBackoff, highWatermark)
}
}

// Update the high watermark.
if hwm := res.HighWatermark; hwm >= 0 {
casHWM(highWatermark, hwm)
}
// handleWant handles a single fetchWant. It performs Fetch requests until the fetchWant is complete or the context is cancelled.
func (r *ConcurrentFetchers) handleWant(ctx context.Context, logger log.Logger, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64) {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

defer close(w.result)
defer wantSpan.Finish()

// This current buffered fetchResult that has not been sent to the result channel yet.
// This is empty at the beginning, then we merge records as soon as we receive them
// from the Fetch response(s).
var bufferedResult fetchResult

for attempt := 0; errBackoff.Ongoing() && !r.isDone() && w.endOffset > w.startOffset; attempt++ {
var continueAttempts bool
w, continueAttempts = r.performAttempt(ctx, logger, attempt, w, errBackoff, highWatermark, bufferedResult)
if !continueAttempts {
break
}
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop may exit without sending any buffered records to the result channel. If the loop condition becomes false (e.g., errBackoff.Ongoing() returns false, isDone() returns true, or w.endOffset <= w.startOffset), any records in bufferedResult will be discarded instead of being sent to w.result. In the original code, bufferedResult was always sent to the channel before checking these conditions again. Consider sending any remaining bufferedResult before the handleWant function exits.

Suggested change
}
}
// Send any remaining bufferedResult before exiting.
if len(bufferedResult.Records) > 0 {
w.result <- bufferedResult
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, interesting, I'll think about that.

}

// Merge the last fetch result if the previous buffered result (if any).
// Keep non-mergeable fields from res, because the last response is the most updated one.
bufferedResult = res.Merge(bufferedResult)
}
// performAttempt performs a single Fetch request and handles the result.
// It returns a boolean indicating whether the attempt should be retried.
func (r *ConcurrentFetchers) performAttempt(ctx context.Context, logger log.Logger, attempt int, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64, bufferedResult fetchResult) (fetchWant, bool) {
attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

if len(bufferedResult.Records) == 0 {
// If we have no buffered records to try to send to the result channel then we retry with another
// Fetch attempt. However, before doing it, we check if we've been told to stop (if so, we should honor it).
attemptSpan.Finish()

select {
case <-r.done:
wantSpan.Finish()
close(w.result)
return
default:
}
defer attemptSpan.Finish()

continue
// Run a single Fetch request.
if res := r.fetchSingle(ctx, w); res.Err != nil {
// We got an error. We handle it and then discard this fetch result content.
var err error
w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan)
if err != nil {
// An error returned from handleKafkaFetchErr is a fatal
// error. We produce an error result and abort further
// processing.

level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err)

select {
case <-r.done:
case <-ctx.Done():
case w.result <- newErrorFetchResult(ctx, r.partitionID, err):
}

// Next attempt will be from the last record onwards.
w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1
w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt.
// Break out of the retry loop, but continue servicing the `wants` channel.
return fetchWant{}, false
}
} else {
// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(res.Records)))

// Update the high watermark.
if hwm := res.HighWatermark; hwm >= 0 {
casHWM(highWatermark, hwm)
}

// Merge the last fetch result if the previous buffered result (if any).
// Keep non-mergeable fields from res, because the last response is the most updated one.
bufferedResult = res.Merge(bufferedResult)
}

// We reset the backoff if we received any records whatsoever. A received record means _some_ success.
// We don't want to slow down until we hit a larger error.
errBackoff.Reset()
if len(bufferedResult.Records) == 0 {
// If we have no buffered records to try to send to the result channel
// then we retry with another Fetch attempt.
return fetchWant{}, true
}

w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1
w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt.

// We reset the backoff if we received any records whatsoever. A received record means _some_ success.
// We don't want to slow down until we hit a larger error.
errBackoff.Reset()

select {
case <-r.done:
case <-ctx.Done():
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
default:
if w.startOffset >= w.endOffset {
// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
bufferedResult.startWaitingForConsumption()
select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case <-ctx.Done():
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
case <-ctx.Done():
default:
if w.startOffset >= w.endOffset {
// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
bufferedResult.startWaitingForConsumption()
select {
case <-r.done:
wantSpan.Finish()
attemptSpan.Finish()
close(w.result)
return
case w.result <- bufferedResult:
bufferedResult = fetchResult{}
case <-ctx.Done():
}
}
}
attemptSpan.Finish()
}
wantSpan.Finish()
close(w.result)
}
return w, false
}

func (r *ConcurrentFetchers) isDone() bool {
select {
case <-r.done:
return true
default:
return false
}
}

func casHWM(highWwatermark *atomic.Int64, newHWM int64) {
for hwm := highWwatermark.Load(); hwm < newHWM; hwm = highWwatermark.Load() {
if highWwatermark.CompareAndSwap(hwm, newHWM) {
func casHWM(highWatermark *atomic.Int64, newHWM int64) {
for hwm := highWatermark.Load(); hwm < newHWM; hwm = highWatermark.Load() {
if highWatermark.CompareAndSwap(hwm, newHWM) {
break
}
}
Expand Down
Loading