-
Notifications
You must be signed in to change notification settings - Fork 3.5k
[receiver/kafkareceiver] Partitions paused by non-permanent errors (e.g. memory limiter) are never resumed #47118
Description
Component(s)
receiver/kafka
Is your feature request related to a problem? Please describe.
When the kafkareceiver (franz-go) is configured with message_marking.after: true and message_marking.on_error: false, a non-permanent error from a downstream processor (e.g. memorylimiterprocessor returning ErrDataRefused) causes PauseFetchPartitions to be called on the affected partition. However, ResumeFetchPartitions is never called anywhere in the codebase — the partition stays paused indefinitely until a rebalance occurs.
The pause-without-resume behavior is reasonable for permanent errors (poison pills), but ErrDataRefused is transient — memory pressure resolves on its own. In production, this silently stalls partitions with no automatic recovery, causing growing consumer lag.
Code path: handleMessage exhausts error_backoff.max_elapsed_time → returns error → consume sets fatalOffset → PauseFetchPartitions → no resume ever.
Describe the solution you'd like
Differentiate between permanent and non-permanent errors when pausing partitions:
- Non-permanent errors: Track paused partitions and periodically attempt
ResumeFetchPartitionswith backoff, so consumption recovers automatically once the transient condition clears. - Permanent errors: Keep current behavior (pause until manual intervention / rebalance).
Describe alternatives you've considered
- Setting a very large
error_backoff.max_elapsed_timeto keep the retry loop alive longer — but this blocks the partition goroutine and doesn't fundamentally solve the problem. - Setting
message_marking.on_error: true— avoids the pause but silently drops data during memory pressure, which is unacceptable for production pipelines.
Additional context
- Version: v0.145.0
- Production impact: Custom OTel Collector distribution consuming traces/metrics from Kafka at scale. Paused partitions require rolling restarts to recover.
- Code refs:
- PauseFetchPartitions call: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.145.0/receiver/kafkareceiver/consumer_franz.go#L341
- ErrDataRefused (non-permanent): https://github.com/open-telemetry/opentelemetry-collector/blob/v0.145.0/internal/memorylimiter/memorylimiter.go#L32
- Related issues: Kafka receiver does not ack on error #18075, [kafkareceiver] commit unprocessed messages #6222
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.