diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index dbc5252490b..62e70b7b4d0 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -689,118 +689,120 @@ func (r *ConcurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg errBackoff := backoff.New(ctx, r.fetchBackoffConfig) for w := range wants { - // Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested. - wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch") - wantSpan.SetTag("start_offset", w.startOffset) - wantSpan.SetTag("end_offset", w.endOffset) - - // This current buffered fetchResult that has not been sent to the result channel yet. - // This is empty at the beginning, then we merge records as soon as we receive them - // from the Fetch response(s). - var bufferedResult fetchResult - - for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ { - attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt") - attemptSpan.SetTag("attempt", attempt) - - // Run a single Fetch request. - if res := r.fetchSingle(ctx, w); res.Err != nil { - // We got an error. We handle it and then discard this fetch result content. - var err error - w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan) - if err != nil { - // An error returned from handleKafkaFetchErr is a fatal - // error. We produce an error result and abort further - // processing. - - level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err) - - select { - case <-r.done: - wantSpan.Finish() - attemptSpan.Finish() - close(w.result) - return - case <-ctx.Done(): - case w.result <- newErrorFetchResult(ctx, r.partitionID, err): - } - - // Break out of the retry loop, but continue servicing the `wants` channel. - break - } - } else { - // We increase the count of buffered records as soon as we fetch them. - r.bufferedFetchedRecords.Add(int64(len(res.Records))) + r.handleWant(ctx, logger, w, errBackoff, highWatermark) + } +} - // Update the high watermark. - if hwm := res.HighWatermark; hwm >= 0 { - casHWM(highWatermark, hwm) - } +// handleWant handles a single fetchWant. It performs Fetch requests until the fetchWant is complete or the context is cancelled. +func (r *ConcurrentFetchers) handleWant(ctx context.Context, logger log.Logger, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64) { + // Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested. + wantSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch") + wantSpan.SetTag("start_offset", w.startOffset) + wantSpan.SetTag("end_offset", w.endOffset) - // Merge the last fetch result if the previous buffered result (if any). - // Keep non-mergeable fields from res, because the last response is the most updated one. - bufferedResult = res.Merge(bufferedResult) - } + defer close(w.result) + defer wantSpan.Finish() - if len(bufferedResult.Records) == 0 { - // If we have no buffered records to try to send to the result channel then we retry with another - // Fetch attempt. However, before doing it, we check if we've been told to stop (if so, we should honor it). - attemptSpan.Finish() - - select { - case <-r.done: - wantSpan.Finish() - close(w.result) - return - default: - } + // This current buffered fetchResult that has not been sent to the result channel yet. + // This is empty at the beginning, then we merge records as soon as we receive them + // from the Fetch response(s). + var bufferedResult fetchResult + continueAttempts := true - continue + for attempt := 0; errBackoff.Ongoing() && continueAttempts && !r.isDone() && w.endOffset > w.startOffset; attempt++ { + w, bufferedResult, continueAttempts = r.performAttempt(ctx, logger, attempt, w, errBackoff, highWatermark, bufferedResult) + } +} + +// performAttempt performs a single Fetch request and handles the result. It +// returns the potentially modified fetchWant, the potentially modified buffered +// result, and a boolean indicating whether to proceed with the next attempt. +func (r *ConcurrentFetchers) performAttempt(ctx context.Context, logger log.Logger, attempt int, w fetchWant, errBackoff *backoff.Backoff, highWatermark *atomic.Int64, bufferedResult fetchResult) (fetchWant, fetchResult, bool) { + attemptSpan, ctx := spanlogger.New(ctx, logger, tracer, "concurrentFetcher.fetch.attempt") + attemptSpan.SetTag("attempt", attempt) + + defer attemptSpan.Finish() + + // Run a single Fetch request. + if res := r.fetchSingle(ctx, w); res.Err != nil { + // We got an error. We handle it and then discard this fetch result content. + var err error + w, err = handleKafkaFetchErr(res.Err, w, errBackoff, r.rangeErrorPolicy, r.startOffsetsReader, r.client, attemptSpan) + if err != nil { + // An error returned from handleKafkaFetchErr is a fatal + // error. We produce an error result and abort further + // processing. + + level.Warn(logger).Log("msg", "failed to handle Kafka fetch error", "error", err) + + select { + case <-r.done: + case <-ctx.Done(): + case w.result <- newErrorFetchResult(ctx, r.partitionID, err): } - // Next attempt will be from the last record onwards. - w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1 - w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt. + return fetchWant{}, fetchResult{}, false + } + } else { + // We increase the count of buffered records as soon as we fetch them. + r.bufferedFetchedRecords.Add(int64(len(res.Records))) + + // Update the high watermark. + if hwm := res.HighWatermark; hwm >= 0 { + casHWM(highWatermark, hwm) + } + + // Merge the last fetch result with the previous buffered result (if any). + // Keep non-mergeable fields from res, because the last response is the most updated one. + bufferedResult = res.Merge(bufferedResult) + } - // We reset the backoff if we received any records whatsoever. A received record means _some_ success. - // We don't want to slow down until we hit a larger error. - errBackoff.Reset() + if len(bufferedResult.Records) == 0 { + // If we have no buffered records to try to send to the result channel + // then we retry with another Fetch attempt. + return w, bufferedResult, true + } + + // Otherwise, there was at least one record fetched. We update the fetchWant and the buffered result. + + w.startOffset = bufferedResult.Records[len(bufferedResult.Records)-1].Offset + 1 + w = w.UpdateBytesPerRecord(bufferedResult.fetchedBytes, len(bufferedResult.Records)) // This takes into account the previous fetch too. This should give us a better average than using just the records from the last attempt. + // We reset the backoff if we received any records whatsoever. + errBackoff.Reset() + + select { + case <-r.done: + case <-ctx.Done(): + case w.result <- bufferedResult: + bufferedResult = fetchResult{} + default: + if w.startOffset >= w.endOffset { + // We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now. + bufferedResult.startWaitingForConsumption() select { case <-r.done: - wantSpan.Finish() - attemptSpan.Finish() - close(w.result) - return + case <-ctx.Done(): case w.result <- bufferedResult: bufferedResult = fetchResult{} - case <-ctx.Done(): - default: - if w.startOffset >= w.endOffset { - // We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now. - bufferedResult.startWaitingForConsumption() - select { - case <-r.done: - wantSpan.Finish() - attemptSpan.Finish() - close(w.result) - return - case w.result <- bufferedResult: - bufferedResult = fetchResult{} - case <-ctx.Done(): - } - } } - attemptSpan.Finish() } - wantSpan.Finish() - close(w.result) + } + return w, bufferedResult, true +} + +func (r *ConcurrentFetchers) isDone() bool { + select { + case <-r.done: + return true + default: + return false } } -func casHWM(highWwatermark *atomic.Int64, newHWM int64) { - for hwm := highWwatermark.Load(); hwm < newHWM; hwm = highWwatermark.Load() { - if highWwatermark.CompareAndSwap(hwm, newHWM) { +func casHWM(highWatermark *atomic.Int64, newHWM int64) { + for hwm := highWatermark.Load(); hwm < newHWM; hwm = highWatermark.Load() { + if highWatermark.CompareAndSwap(hwm, newHWM) { break } }