diff --git a/quixstreams/__init__.py b/quixstreams/__init__.py index 6149e2d70..2e1af11f7 100644 --- a/quixstreams/__init__.py +++ b/quixstreams/__init__.py @@ -5,4 +5,4 @@ __all__ = ["Application", "message_context", "MessageContext", "State"] -__version__ = "3.23.2" +__version__ = "3.23.3" diff --git a/quixstreams/app.py b/quixstreams/app.py index 30d71ab4e..d551bf9ce 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -152,6 +152,7 @@ def __init__( topic_create_timeout: float = 60, processing_guarantee: ProcessingGuarantee = "at-least-once", max_partition_buffer_size: int = 10000, + broker_availability_timeout: float = 120.0, ): """ :param broker_address: Connection settings for Kafka. @@ -220,6 +221,12 @@ def __init__( It is a soft limit, and the actual number of buffered messages can be up to x2 higher. Lower value decreases the memory use, but increases the latency. Default - `10000`. + :param broker_availability_timeout: timeout in seconds. If all Kafka brokers + are unavailable for longer than this, the Application will raise a + ``KafkaBrokerUnavailableError`` to allow the orchestrator to restart + the application with fresh connections. + Set to ``0`` to disable the check. + Default - ``120.0``s (2 minutes).

***Error Handlers***
To handle errors, `Application` accepts callbacks triggered when @@ -343,6 +350,12 @@ def __init__( self._on_message_processed = on_message_processed self._on_processing_error = on_processing_error or default_on_processing_error + if broker_availability_timeout < 0: + raise ValueError( + f"broker_availability_timeout must be >= 0, " + f"got {broker_availability_timeout}" + ) + self._broker_availability_timeout = broker_availability_timeout self._consumer = self._get_internal_consumer( on_error=on_consumer_error, @@ -361,6 +374,7 @@ def __init__( recovery_manager = RecoveryManager( consumer=self._consumer, topic_manager=self._topic_manager, + broker_availability_timeout=self._broker_availability_timeout, ) self._state_manager = StateStoreManager( @@ -761,6 +775,7 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic extra_config_overrides=consumer_extra_config_overrides ), self._get_topic_manager(), + broker_availability_timeout=self._broker_availability_timeout, ) return topic @@ -926,6 +941,13 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): processing_context.commit_checkpoint() consumer.resume_backpressured() source_manager.raise_for_error() + if self._broker_availability_timeout: + self._producer.raise_if_broker_unavailable( + self._broker_availability_timeout + ) + self._consumer.raise_if_broker_unavailable( + self._broker_availability_timeout + ) printer.print() run_tracker.update_status() @@ -940,6 +962,13 @@ def _run_sources(self): source_manager.start_sources() while run_tracker.running and source_manager.is_alive(): source_manager.raise_for_error() + if self._broker_availability_timeout: + self._producer.raise_if_broker_unavailable( + self._broker_availability_timeout + ) + self._consumer.raise_if_broker_unavailable( + self._broker_availability_timeout + ) run_tracker.update_status() time.sleep(1) self.stop() @@ -1002,6 +1031,8 @@ def _process_message(self, dataframe_composed): topic=topic_name, partition=partition, offset=offset ) self._run_tracker.set_message_consumed(True) + self._producer._broker_available() # noqa: SLF001 + self._consumer._broker_available() # noqa: SLF001 if self._on_message_processed is not None: self._on_message_processed(topic_name, partition, offset) diff --git a/quixstreams/internal_producer.py b/quixstreams/internal_producer.py index 42a0d461b..69834ab03 100644 --- a/quixstreams/internal_producer.py +++ b/quixstreams/internal_producer.py @@ -170,6 +170,14 @@ def poll(self, timeout: float = 0): return raise + def _broker_available(self): + """Reset the broker unavailability tracker on the underlying Producer.""" + self._producer._broker_available() # noqa: SLF001 + + def raise_if_broker_unavailable(self, timeout: float): + """Raise if all brokers have been unavailable for longer than ``timeout`` seconds.""" + self._producer.raise_if_broker_unavailable(timeout) + def produce( self, topic: str, @@ -203,6 +211,8 @@ def _on_delivery(self, err: Optional[KafkaError], msg: Message): topic, partition, offset = msg.topic(), msg.partition(), msg.offset() if err is None: self._tp_offsets[(topic, partition)] = offset + # Successful delivery confirms broker is reachable + self._producer._broker_available() # noqa: SLF001 else: self._error = err diff --git a/quixstreams/kafka/consumer.py b/quixstreams/kafka/consumer.py index 0f69b190f..9eb291b27 100644 --- a/quixstreams/kafka/consumer.py +++ b/quixstreams/kafka/consumer.py @@ -1,5 +1,6 @@ import functools import logging +import time import typing from typing import Any, Callable, List, Optional, Tuple, Union, cast @@ -20,7 +21,7 @@ ) from .configuration import ConnectionConfig -from .exceptions import KafkaConsumerException +from .exceptions import KafkaBrokerUnavailableError, KafkaConsumerException __all__ = ( "BaseConsumer", @@ -124,6 +125,13 @@ def __init__( if isinstance(broker_address, str): broker_address = ConnectionConfig(bootstrap_servers=broker_address) + self._broker_unavailable_since: Optional[float] = None + + # Wrap the user-provided (or default) error callback so that broker + # availability tracking always runs, regardless of custom callbacks. + self._user_error_cb = error_callback + error_callback = self._error_cb + self._consumer_config = { # previous Quix Streams defaults "enable.auto.offset.store": False, @@ -143,6 +151,53 @@ def __init__( } self._inner_consumer: Optional[ConfluentConsumer] = None + def _error_cb(self, error: KafkaError): + """Instance-level error callback that tracks broker availability + and delegates to the user-provided (or default) error callback.""" + error_code = error.code() + if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001 + if self._broker_unavailable_since is None: + self._broker_unavailable_since = time.monotonic() + self._user_error_cb(error) + + def _broker_available(self): + """Reset the broker unavailability tracker.""" + self._broker_unavailable_since = None + + def raise_if_broker_unavailable(self, timeout: float): + """Raise if all brokers have been unavailable for longer than ``timeout`` seconds. + + Before raising, performs an active metadata probe to confirm the brokers + are truly unreachable (avoids false positives for idle applications). + + :param timeout: seconds of continuous unavailability before raising. + :raises KafkaBrokerUnavailableError: if the timeout has been exceeded. + """ + if self._broker_unavailable_since is not None: + elapsed = time.monotonic() - self._broker_unavailable_since + if elapsed >= timeout: + try: + self._consumer.list_topics(timeout=5.0) + self._broker_unavailable_since = None + logger.info( + "Consumer broker availability probe succeeded after %.0fs; " + "resetting unavailability timer.", + elapsed, + ) + return + except Exception: + logger.debug( + "Consumer broker availability probe failed", exc_info=True + ) + raise KafkaBrokerUnavailableError( + f"All Kafka brokers have been unavailable for " + f"{elapsed:.0f}s (timeout={timeout:.0f}s). " + f"The application cannot recover automatically; " + f"restarting is required. " + f"Adjust via Application(broker_availability_timeout=...) " + f"or set to 0 to disable." + ) + def poll( self, timeout: Optional[float] = None ) -> Optional[RawConfluentKafkaMessageProto]: diff --git a/quixstreams/kafka/exceptions.py b/quixstreams/kafka/exceptions.py index 1bffd6bf4..6002c3a3e 100644 --- a/quixstreams/kafka/exceptions.py +++ b/quixstreams/kafka/exceptions.py @@ -33,3 +33,6 @@ class KafkaProducerDeliveryError(BaseKafkaException): ... class InvalidProducerConfigError(QuixException): ... + + +class KafkaBrokerUnavailableError(QuixException): ... diff --git a/quixstreams/kafka/producer.py b/quixstreams/kafka/producer.py index 4d9940a36..fb12c76c6 100644 --- a/quixstreams/kafka/producer.py +++ b/quixstreams/kafka/producer.py @@ -1,5 +1,6 @@ import functools import logging +import time import uuid from typing import Callable, List, Optional, Union @@ -23,16 +24,13 @@ "PRODUCER_POLL_TIMEOUT", ) -from .exceptions import InvalidProducerConfigError +from .exceptions import InvalidProducerConfigError, KafkaBrokerUnavailableError DeliveryCallback = Callable[[Optional[KafkaError], Message], None] logger = logging.getLogger(__name__) -IGNORED_KAFKA_ERRORS = ( - # This error seems to be thrown despite brokers being available. - # Seems linked to `connections.max.idle.ms`. - KafkaError._ALL_BROKERS_DOWN, # noqa: SLF001 +_SILENTLY_IGNORED_KAFKA_ERRORS = ( # Broker handle destroyed - common/typical behavior, often seen via AdminClient KafkaError._DESTROY, # noqa: SLF001 ) @@ -43,9 +41,12 @@ def _default_error_cb(error: KafkaError): error_code = error.code() - if error_code in IGNORED_KAFKA_ERRORS: + if error_code in _SILENTLY_IGNORED_KAFKA_ERRORS: logger.debug(error.str()) return + if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001 + logger.warning(error.str()) + return logger.error(f'Kafka producer error: {error.str()} code="{error_code}"') @@ -96,6 +97,13 @@ def __init__( if isinstance(broker_address, str): broker_address = ConnectionConfig(bootstrap_servers=broker_address) + self._broker_unavailable_since: Optional[float] = None + + # Wrap the user-provided (or default) error callback so that broker + # availability tracking always runs, regardless of custom callbacks. + self._user_error_cb = error_callback + error_callback = self._error_cb + self._producer_config = { # previous Quix Streams defaults "partitioner": "murmur2", @@ -235,6 +243,55 @@ def _producer(self) -> ConfluentProducer: self._inner_producer.init_transactions() return self._inner_producer + def _error_cb(self, error: KafkaError): + """Instance-level error callback that tracks broker availability + and delegates to the user-provided (or default) error callback.""" + error_code = error.code() + if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001 + if self._broker_unavailable_since is None: + self._broker_unavailable_since = time.monotonic() + self._user_error_cb(error) + + def _broker_available(self): + """Reset the broker unavailability tracker.""" + self._broker_unavailable_since = None + + def raise_if_broker_unavailable(self, timeout: float): + """Raise if all brokers have been unavailable for longer than ``timeout`` seconds. + + Before raising, performs an active metadata probe to confirm the brokers + are truly unreachable (avoids false positives for idle applications). + + :param timeout: seconds of continuous unavailability before raising. + :raises KafkaBrokerUnavailableError: if the timeout has been exceeded. + """ + if self._broker_unavailable_since is not None: + elapsed = time.monotonic() - self._broker_unavailable_since + if elapsed >= timeout: + # Active probe: try a lightweight metadata request before raising. + # This avoids false positives when brokers recovered but no + # messages flowed to reset the timer (idle apps). + try: + self._producer.list_topics(timeout=5.0) + # Probe succeeded — brokers are actually reachable. + self._broker_unavailable_since = None + logger.info( + "Broker availability probe succeeded after %.0fs; " + "resetting unavailability timer.", + elapsed, + ) + return + except Exception: + logger.debug("Broker availability probe failed", exc_info=True) + raise KafkaBrokerUnavailableError( + f"All Kafka brokers have been unavailable for " + f"{elapsed:.0f}s (timeout={timeout:.0f}s). " + f"The application cannot recover automatically; " + f"restarting is required. " + f"Adjust via Application(broker_availability_timeout=...) " + f"or set to 0 to disable." + ) + def __len__(self): return len(self._producer) diff --git a/quixstreams/sources/base/manager.py b/quixstreams/sources/base/manager.py index 517232077..4da3aca3b 100644 --- a/quixstreams/sources/base/manager.py +++ b/quixstreams/sources/base/manager.py @@ -45,6 +45,7 @@ def __init__( producer: InternalProducer, consumer: InternalConsumer, topic_manager: TopicManager, + broker_availability_timeout: float = 0, ) -> None: super().__init__() self.topic = topic @@ -58,6 +59,7 @@ def __init__( self._consumer = consumer self._producer = producer + self._broker_availability_timeout = broker_availability_timeout # copy parent process log level to the child process self._loglevel = logging.getLogger(LOGGER_NAME).level @@ -130,6 +132,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition: recovery_manager = RecoveryManager( consumer=self._consumer, topic_manager=self._topic_manager, + broker_availability_timeout=self._broker_availability_timeout, ) state_manager = StateStoreManager( @@ -264,6 +267,7 @@ def register( producer, consumer, topic_manager, + broker_availability_timeout: float = 0, ) -> SourceProcess: """ Register a new source in the manager. @@ -281,6 +285,7 @@ def register( producer=producer, consumer=consumer, topic_manager=topic_manager, + broker_availability_timeout=broker_availability_timeout, ) self.processes.append(process) return process diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index bf26ddf7c..c07d616a0 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -350,10 +350,16 @@ class RecoveryManager: # At 10-second progress logging intervals, 60 attempts = ~10 minutes MAX_INVALID_OFFSET_ATTEMPTS = 60 - def __init__(self, consumer: BaseConsumer, topic_manager: TopicManager): + def __init__( + self, + consumer: BaseConsumer, + topic_manager: TopicManager, + broker_availability_timeout: float = 0, + ): self._running = False self._consumer = consumer self._topic_manager = topic_manager + self._broker_availability_timeout = broker_availability_timeout self._recovery_partitions: Dict[int, Dict[str, RecoveryPartition]] = {} self._last_progress_logged_time = time.monotonic() # Cache position results to avoid double calls in same iteration @@ -608,6 +614,11 @@ def _recovery_loop(self) -> None: msg = raise_for_msg_error(msg) rp = self._recovery_partitions[msg.partition()][msg.topic()] rp.recover_from_changelog_message(changelog_message=msg) + self._consumer._broker_available() # noqa: SLF001 + if self._broker_availability_timeout: + self._consumer.raise_if_broker_unavailable( + self._broker_availability_timeout + ) def _get_position_with_cache(self, rp: RecoveryPartition) -> ConfluentPartition: """ diff --git a/tests/test_quixstreams/test_kafka/test_broker_availability.py b/tests/test_quixstreams/test_kafka/test_broker_availability.py new file mode 100644 index 000000000..ff8eb5c70 --- /dev/null +++ b/tests/test_quixstreams/test_kafka/test_broker_availability.py @@ -0,0 +1,572 @@ +import time +import uuid +from unittest.mock import patch + +import pytest +from confluent_kafka import KafkaError + +from quixstreams.kafka.exceptions import KafkaBrokerUnavailableError +from quixstreams.kafka.producer import Producer + + +class TestProducerBrokerAvailability: + """Tests for Producer detecting prolonged broker unavailability.""" + + def _simulate_all_brokers_down(self, producer: Producer): + """Trigger the producer's error callback with _ALL_BROKERS_DOWN.""" + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + producer._error_cb(error) + + def test_all_brokers_down_sets_timestamp(self): + """When _ALL_BROKERS_DOWN fires, the producer should record the timestamp.""" + producer = Producer(broker_address="localhost:9092") + assert producer._broker_unavailable_since is None + + self._simulate_all_brokers_down(producer) + assert producer._broker_unavailable_since is not None + + def test_all_brokers_down_keeps_first_timestamp(self): + """Repeated _ALL_BROKERS_DOWN should keep the original timestamp, not update it.""" + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + first_ts = producer._broker_unavailable_since + + with patch("time.monotonic", return_value=200.0): + self._simulate_all_brokers_down(producer) + assert producer._broker_unavailable_since == first_ts + + def test_raise_if_broker_unavailable_raises_after_timeout(self): + """Should raise after _ALL_BROKERS_DOWN has persisted beyond the timeout + and the active metadata probe also fails.""" + from unittest.mock import MagicMock + + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + + # Install a fake inner producer so the metadata probe fails + fake = MagicMock() + fake.list_topics.side_effect = Exception("brokers down") + producer._inner_producer = fake + + with patch("time.monotonic", return_value=280.0): + with pytest.raises( + KafkaBrokerUnavailableError, + match="broker_availability_timeout", + ): + producer.raise_if_broker_unavailable(timeout=120.0) + + def test_raise_if_broker_unavailable_no_raise_before_timeout(self): + """Should NOT raise if timeout hasn't elapsed yet.""" + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + + with patch("time.monotonic", return_value=150.0): + # 50s elapsed, timeout is 120s — should not raise + producer.raise_if_broker_unavailable(timeout=120.0) + + def test_raise_if_broker_unavailable_no_raise_when_brokers_ok(self): + """Should NOT raise if no _ALL_BROKERS_DOWN was ever reported.""" + producer = Producer(broker_address="localhost:9092") + producer.raise_if_broker_unavailable(timeout=120.0) + + def test_broker_available_resets_timestamp(self): + """Calling broker_available() should reset the unavailable timestamp.""" + producer = Producer(broker_address="localhost:9092") + + self._simulate_all_brokers_down(producer) + assert producer._broker_unavailable_since is not None + + producer._broker_available() + assert producer._broker_unavailable_since is None + + def test_broker_available_prevents_raise_after_reset(self): + """After a reset via broker_available(), the timeout check should not raise.""" + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + + # Broker comes back + producer._broker_available() + + # Even with enough elapsed time, should not raise because reset happened + with patch("time.monotonic", return_value=300.0): + producer.raise_if_broker_unavailable(timeout=120.0) + + def test_error_cb_still_handles_other_errors(self): + """Non-ALL_BROKERS_DOWN errors should still be logged normally, + not treated as broker unavailability.""" + producer = Producer(broker_address="localhost:9092") + + # Simulate a different error (e.g., _TRANSPORT) + error = KafkaError(KafkaError._TRANSPORT) + producer._error_cb(error) + + # Should not set broker unavailable timestamp + assert producer._broker_unavailable_since is None + + def test_error_cb_destroy_does_not_set_unavailable(self): + """_DESTROY errors should be ignored like before, not setting unavailable.""" + producer = Producer(broker_address="localhost:9092") + + error = KafkaError(KafkaError._DESTROY) + producer._error_cb(error) + + assert producer._broker_unavailable_since is None + + def test_active_probe_resets_timer_when_brokers_recover(self): + """If the metadata probe succeeds, the timer should be reset instead of raising.""" + from unittest.mock import MagicMock + + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + + # Install a fake inner producer where the probe succeeds + fake = MagicMock() + fake.list_topics.return_value = MagicMock() # success + producer._inner_producer = fake + + with patch("time.monotonic", return_value=280.0): + # Should NOT raise because probe succeeds + producer.raise_if_broker_unavailable(timeout=120.0) + + # Timer should have been reset + assert producer._broker_unavailable_since is None + + def test_custom_error_callback_still_tracks_brokers(self): + """A custom error_callback should still get broker availability tracking.""" + calls = [] + + def my_error_cb(error): + calls.append(error.code()) + + producer = Producer(broker_address="localhost:9092", error_callback=my_error_cb) + + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + producer._error_cb(error) + + # Custom callback was called + assert KafkaError._ALL_BROKERS_DOWN in calls + # AND tracking was set + assert producer._broker_unavailable_since is not None + + def test_error_message_includes_parameter_name(self): + """The error message should mention broker_availability_timeout for discoverability.""" + from unittest.mock import MagicMock + + producer = Producer(broker_address="localhost:9092") + + with patch("time.monotonic", return_value=100.0): + self._simulate_all_brokers_down(producer) + + fake = MagicMock() + fake.list_topics.side_effect = Exception("down") + producer._inner_producer = fake + + with patch("time.monotonic", return_value=280.0): + with pytest.raises( + KafkaBrokerUnavailableError, + match="broker_availability_timeout.*set to 0 to disable", + ): + producer.raise_if_broker_unavailable(timeout=120.0) + + +class TestAppBrokerAvailability: + """Tests for Application raising on prolonged broker unavailability. + + These are unit tests that verify the wiring between Application and + Producer's broker availability tracking, without requiring Kafka. + """ + + def _make_app(self, **kwargs): + """Create an Application with mocked Kafka connections.""" + from quixstreams.app import Application + + defaults = dict( + broker_address="localhost:9092", + consumer_group="test-group", + auto_create_topics=False, + use_changelog_topics=False, + ) + defaults.update(kwargs) + return Application(**defaults) + + def test_app_raises_when_brokers_unavailable_beyond_timeout(self): + """The app should raise KafkaBrokerUnavailableError when brokers have been + down for longer than broker_availability_timeout.""" + from unittest.mock import MagicMock + from unittest.mock import patch as mock_patch + + from quixstreams.internal_consumer import InternalConsumer + from quixstreams.models.topics.admin import TopicAdmin + from quixstreams.models.topics.topic import TopicConfig + + app = self._make_app(broker_availability_timeout=0.1) + + # Simulate _ALL_BROKERS_DOWN in the past + app._producer._producer._broker_unavailable_since = time.monotonic() - 10.0 + + # Make the active metadata probe fail (brokers truly down) + fake_confluent = MagicMock() + fake_confluent.list_topics.side_effect = Exception("brokers down") + app._producer._producer._inner_producer = fake_confluent + + def fake_inspect(topic_names, timeout=None): + return { + n: TopicConfig(num_partitions=1, replication_factor=1) + for n in topic_names + } + + with mock_patch.object(TopicAdmin, "inspect_topics", side_effect=fake_inspect): + topic = app.topic(str(uuid.uuid4())) + app.dataframe(topic) + + def mock_poll_row(self_consumer, *args, **kwargs): + return None + + with mock_patch.object(InternalConsumer, "poll_row", mock_poll_row): + with mock_patch.object(InternalConsumer, "_subscribe"): + with pytest.raises(KafkaBrokerUnavailableError): + app.run() + + def test_app_does_not_raise_when_check_disabled(self): + """With broker_availability_timeout=0, the app should NOT check + broker availability.""" + from unittest.mock import patch as mock_patch + + from quixstreams.internal_consumer import InternalConsumer + from quixstreams.models.topics.admin import TopicAdmin + from quixstreams.models.topics.topic import TopicConfig + + app = self._make_app(broker_availability_timeout=0) + + # Simulate _ALL_BROKERS_DOWN in the past + app._producer._producer._broker_unavailable_since = time.monotonic() - 9999.0 + + def fake_inspect(topic_names, timeout=None): + return { + n: TopicConfig(num_partitions=1, replication_factor=1) + for n in topic_names + } + + with mock_patch.object(TopicAdmin, "inspect_topics", side_effect=fake_inspect): + topic = app.topic(str(uuid.uuid4())) + app.dataframe(topic) + + poll_count = 0 + + def mock_poll_row(self_consumer, *args, **kwargs): + nonlocal poll_count + poll_count += 1 + if poll_count >= 3: + app.stop() + return None + + with mock_patch.object(InternalConsumer, "poll_row", mock_poll_row): + with mock_patch.object(InternalConsumer, "_subscribe"): + # Should NOT raise — just run and stop normally + app.run() + + def test_internal_producer_broker_available_passthrough(self): + """InternalProducer._broker_available() should reset the tracker + on the underlying Producer.""" + app = self._make_app() + + # Set the tracker as if brokers were down + app._producer._producer._broker_unavailable_since = time.monotonic() - 10.0 + + # Call via InternalProducer passthrough + app._producer._broker_available() + + assert app._producer._producer._broker_unavailable_since is None + + def test_default_timeout_is_120_seconds(self): + """The default broker_availability_timeout should be 120s.""" + app = self._make_app() + assert app._broker_availability_timeout == 120.0 + + def test_negative_timeout_raises_value_error(self): + """A negative broker_availability_timeout should raise ValueError.""" + with pytest.raises( + ValueError, match="broker_availability_timeout must be >= 0" + ): + self._make_app(broker_availability_timeout=-1) + + +class TestConsumerBrokerAvailability: + """Tests for BaseConsumer detecting prolonged broker unavailability.""" + + def test_all_brokers_down_sets_timestamp(self): + """When _ALL_BROKERS_DOWN fires, the consumer should record the timestamp.""" + from quixstreams.kafka.consumer import BaseConsumer + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + assert consumer._broker_unavailable_since is None + + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + consumer._error_cb(error) + assert consumer._broker_unavailable_since is not None + + def test_broker_available_resets_timestamp(self): + """_broker_available() should reset the unavailable timestamp.""" + from quixstreams.kafka.consumer import BaseConsumer + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + consumer._error_cb(error) + assert consumer._broker_unavailable_since is not None + + consumer._broker_available() + assert consumer._broker_unavailable_since is None + + def test_raise_if_broker_unavailable_raises_after_timeout(self): + """Should raise after timeout with failed probe.""" + from unittest.mock import MagicMock + + from quixstreams.kafka.consumer import BaseConsumer + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + + with patch("time.monotonic", return_value=100.0): + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + consumer._error_cb(error) + + fake = MagicMock() + fake.list_topics.side_effect = Exception("down") + consumer._inner_consumer = fake + + with patch("time.monotonic", return_value=280.0): + with pytest.raises( + KafkaBrokerUnavailableError, + match="broker_availability_timeout", + ): + consumer.raise_if_broker_unavailable(timeout=120.0) + + def test_active_probe_resets_timer_when_brokers_recover(self): + """If the metadata probe succeeds, the timer should reset.""" + from unittest.mock import MagicMock + + from quixstreams.kafka.consumer import BaseConsumer + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + + with patch("time.monotonic", return_value=100.0): + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + consumer._error_cb(error) + + fake = MagicMock() + fake.list_topics.return_value = MagicMock() + consumer._inner_consumer = fake + + with patch("time.monotonic", return_value=280.0): + consumer.raise_if_broker_unavailable(timeout=120.0) + + assert consumer._broker_unavailable_since is None + + def test_custom_error_callback_still_tracks_brokers(self): + """Custom error callback should still get broker tracking.""" + from quixstreams.kafka.consumer import BaseConsumer + + calls = [] + + def my_error_cb(error): + calls.append(error.code()) + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + error_callback=my_error_cb, + ) + + error = KafkaError(KafkaError._ALL_BROKERS_DOWN) + consumer._error_cb(error) + + assert KafkaError._ALL_BROKERS_DOWN in calls + assert consumer._broker_unavailable_since is not None + + +class TestRecoveryBrokerAvailability: + """Tests for broker availability checks during state recovery.""" + + def test_recovery_loop_raises_when_broker_unavailable(self): + """The recovery loop should raise KafkaBrokerUnavailableError when + brokers have been down longer than the configured timeout.""" + from unittest.mock import MagicMock + from unittest.mock import patch as mock_patch + + from quixstreams.kafka.consumer import BaseConsumer + from quixstreams.models.topics import TopicManager + from quixstreams.state.recovery import RecoveryManager + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + + topic_manager = MagicMock(spec=TopicManager) + recovery_manager = RecoveryManager( + consumer=consumer, + topic_manager=topic_manager, + broker_availability_timeout=0.1, + ) + + # Simulate broker down for a long time + consumer._broker_unavailable_since = time.monotonic() - 10.0 + + # Make the active metadata probe fail + fake_confluent = MagicMock() + fake_confluent.list_topics.side_effect = Exception("brokers down") + fake_confluent.poll.return_value = None + consumer._inner_consumer = fake_confluent + + # Make recovery loop think it has work (so it enters the while loop) + fake_rp = MagicMock() + recovery_manager._recovery_partitions = {0: {"store": fake_rp}} + recovery_manager._running = True + + # Mock _update_recovery_status to isolate the broker check behavior + with mock_patch.object(recovery_manager, "_update_recovery_status"): + with pytest.raises(KafkaBrokerUnavailableError): + recovery_manager._recovery_loop() + + def test_recovery_loop_no_check_when_timeout_disabled(self): + """With broker_availability_timeout=0 (disabled), recovery loop should + NOT check broker availability.""" + from unittest.mock import MagicMock + from unittest.mock import patch as mock_patch + + from quixstreams.kafka.consumer import BaseConsumer + from quixstreams.models.topics import TopicManager + from quixstreams.state.recovery import RecoveryManager + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + + topic_manager = MagicMock(spec=TopicManager) + recovery_manager = RecoveryManager( + consumer=consumer, + topic_manager=topic_manager, + broker_availability_timeout=0, + ) + + # Simulate broker down for a long time + consumer._broker_unavailable_since = time.monotonic() - 9999.0 + + # Make the active metadata probe fail + fake_confluent = MagicMock() + fake_confluent.list_topics.side_effect = Exception("brokers down") + fake_confluent.poll.return_value = None + consumer._inner_consumer = fake_confluent + + # Make recovery loop think it has work + fake_rp = MagicMock() + recovery_manager._recovery_partitions = {0: {"store": fake_rp}} + recovery_manager._running = True + + poll_count = 0 + + def stop_after_3(*args, **kwargs): + nonlocal poll_count + poll_count += 1 + if poll_count >= 3: + recovery_manager._running = False + return None + + fake_confluent.poll.side_effect = stop_after_3 + + # Mock _update_recovery_status to isolate the broker check behavior + with mock_patch.object(recovery_manager, "_update_recovery_status"): + # Should NOT raise — just loop and stop + recovery_manager._recovery_loop() + assert poll_count >= 3 + + def test_recovery_loop_resets_timer_on_changelog_message(self): + """Successfully consuming a changelog message during recovery should + reset the broker unavailability timer, preventing false positives.""" + from unittest.mock import MagicMock + from unittest.mock import patch as mock_patch + + from quixstreams.kafka.consumer import BaseConsumer + from quixstreams.models.topics import TopicManager + from quixstreams.state.recovery import RecoveryManager + + consumer = BaseConsumer( + broker_address="localhost:9092", + consumer_group="test", + auto_offset_reset="latest", + ) + + topic_manager = MagicMock(spec=TopicManager) + recovery_manager = RecoveryManager( + consumer=consumer, + topic_manager=topic_manager, + broker_availability_timeout=0.1, + ) + + # Simulate broker was down (but has since recovered — messages are flowing) + consumer._broker_unavailable_since = time.monotonic() - 10.0 + + # Make the metadata probe fail — only changelog consumption should reset + fake_confluent = MagicMock() + fake_confluent.list_topics.side_effect = Exception("brokers down") + consumer._inner_consumer = fake_confluent + + # Set up recovery partition + fake_rp = MagicMock() + fake_rp.changelog_name = "test-changelog" + fake_rp.partition_num = 0 + recovery_manager._recovery_partitions = {0: {"test-changelog": fake_rp}} + recovery_manager._running = True + + poll_count = 0 + + def poll_then_stop(*args, **kwargs): + nonlocal poll_count + poll_count += 1 + if poll_count == 1: + # First poll: return a fake changelog message + msg = MagicMock() + msg.error.return_value = None + msg.partition.return_value = 0 + msg.topic.return_value = "test-changelog" + return msg + # Second poll: stop the loop + recovery_manager._running = False + return None + + fake_confluent.poll.side_effect = poll_then_stop + + with mock_patch.object(recovery_manager, "_update_recovery_status"): + recovery_manager._recovery_loop() + + # Timer should have been reset by the successful changelog consumption + assert consumer._broker_unavailable_since is None