Fix: detect prolonged broker unavailability and crash for restart#1082
Merged
peter-quix merged 6 commits intomainfrom Feb 13, 2026
Merged
Fix: detect prolonged broker unavailability and crash for restart#1082peter-quix merged 6 commits intomainfrom
peter-quix merged 6 commits intomainfrom
Conversation
When all Kafka brokers are unreachable for longer than broker_availability_timeout (default 300s), the Application raises KafkaBrokerUnavailableError so the orchestrator can restart it with fresh connections. This prevents the zombie state where an app silently logs "all brokers down" indefinitely without recovering. - Track _ALL_BROKERS_DOWN errors on Producer via _error_cb - Reset timer on successful message consumption and delivery callbacks - Active metadata probe before raising to avoid false positives on idle apps - Compose tracking with custom error callbacks (never silently disabled) - Add check to both _run_dataframe and _run_sources loops - Validate broker_availability_timeout is non-negative - Actionable error message with parameter name and disable instructions
- Add broker unavailability detection to BaseConsumer (mirrors Producer) - Wire consumer checks into both _run_dataframe and _run_sources loops - Reset consumer timer on successful message consumption - Remove topic arg from list_topics() probe to avoid ACL/auto-creation issues - Make _broker_available() private on Producer, InternalProducer, and Consumer
The recovery loop in RecoveryManager._recovery_loop() could block indefinitely if brokers went down during changelog replay. Now checks consumer broker availability each iteration when enabled. - Add broker_availability_timeout param to RecoveryManager - Check consumer.raise_if_broker_unavailable() in _recovery_loop() - Wire timeout through Application → RecoveryManager - Wire timeout through Application → SourceManager → SourceProcess
During recovery, successfully consuming changelog messages now resets the broker unavailability timer, preventing false positives when _ALL_BROKERS_DOWN fired before recovery started but brokers have since recovered. Without this, the active metadata probe would fire every iteration adding unnecessary latency.
…sion to 3.23.3 120s (2 minutes) is sufficient given the active metadata probe prevents false positives. 300s was overly conservative for most deployments.
SteveRosam
approved these changes
Feb 13, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #1081
When all Kafka brokers become unavailable (e.g., during a cluster restart), the SDK could enter a zombie state — logging
_ALL_BROKERS_DOWNindefinitely but never reconnecting or raising an error. In one observed case, the application was stuck for 20 days.This PR adds a configurable
broker_availability_timeout(default 120s) that detects prolonged broker unavailability and raisesKafkaBrokerUnavailableError, allowing the container orchestrator to restart the application with fresh connections.Key changes
_ALL_BROKERS_DOWNerrors on bothProducerandBaseConsumervia_broker_unavailable_sincetimestamplist_topics(timeout=5.0)) before raising to avoid false positives from Azure'sconnections.max.idle.ms=180000idle timeoutApplication(broker_availability_timeout=120.0)— set to0to disableBreaking change