Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quixstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

__all__ = ["Application", "message_context", "MessageContext", "State"]

__version__ = "3.23.2"
__version__ = "3.23.3"
31 changes: 31 additions & 0 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "at-least-once",
max_partition_buffer_size: int = 10000,
broker_availability_timeout: float = 120.0,
):
"""
:param broker_address: Connection settings for Kafka.
Expand Down Expand Up @@ -220,6 +221,12 @@ def __init__(
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
Lower value decreases the memory use, but increases the latency.
Default - `10000`.
:param broker_availability_timeout: timeout in seconds. If all Kafka brokers
are unavailable for longer than this, the Application will raise a
``KafkaBrokerUnavailableError`` to allow the orchestrator to restart
the application with fresh connections.
Set to ``0`` to disable the check.
Default - ``120.0``s (2 minutes).

<br><br>***Error Handlers***<br>
To handle errors, `Application` accepts callbacks triggered when
Expand Down Expand Up @@ -343,6 +350,12 @@ def __init__(

self._on_message_processed = on_message_processed
self._on_processing_error = on_processing_error or default_on_processing_error
if broker_availability_timeout < 0:
raise ValueError(
f"broker_availability_timeout must be >= 0, "
f"got {broker_availability_timeout}"
)
self._broker_availability_timeout = broker_availability_timeout

self._consumer = self._get_internal_consumer(
on_error=on_consumer_error,
Expand All @@ -361,6 +374,7 @@ def __init__(
recovery_manager = RecoveryManager(
consumer=self._consumer,
topic_manager=self._topic_manager,
broker_availability_timeout=self._broker_availability_timeout,
)

self._state_manager = StateStoreManager(
Expand Down Expand Up @@ -761,6 +775,7 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
extra_config_overrides=consumer_extra_config_overrides
),
self._get_topic_manager(),
broker_availability_timeout=self._broker_availability_timeout,
)
return topic

Expand Down Expand Up @@ -926,6 +941,13 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
processing_context.commit_checkpoint()
consumer.resume_backpressured()
source_manager.raise_for_error()
if self._broker_availability_timeout:
self._producer.raise_if_broker_unavailable(
self._broker_availability_timeout
)
self._consumer.raise_if_broker_unavailable(
self._broker_availability_timeout
)
printer.print()
run_tracker.update_status()

Expand All @@ -940,6 +962,13 @@ def _run_sources(self):
source_manager.start_sources()
while run_tracker.running and source_manager.is_alive():
source_manager.raise_for_error()
if self._broker_availability_timeout:
self._producer.raise_if_broker_unavailable(
self._broker_availability_timeout
)
self._consumer.raise_if_broker_unavailable(
self._broker_availability_timeout
)
run_tracker.update_status()
time.sleep(1)
self.stop()
Expand Down Expand Up @@ -1002,6 +1031,8 @@ def _process_message(self, dataframe_composed):
topic=topic_name, partition=partition, offset=offset
)
self._run_tracker.set_message_consumed(True)
self._producer._broker_available() # noqa: SLF001
self._consumer._broker_available() # noqa: SLF001

if self._on_message_processed is not None:
self._on_message_processed(topic_name, partition, offset)
Expand Down
10 changes: 10 additions & 0 deletions quixstreams/internal_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ def poll(self, timeout: float = 0):
return
raise

def _broker_available(self):
"""Reset the broker unavailability tracker on the underlying Producer."""
self._producer._broker_available() # noqa: SLF001

def raise_if_broker_unavailable(self, timeout: float):
"""Raise if all brokers have been unavailable for longer than ``timeout`` seconds."""
self._producer.raise_if_broker_unavailable(timeout)

def produce(
self,
topic: str,
Expand Down Expand Up @@ -203,6 +211,8 @@ def _on_delivery(self, err: Optional[KafkaError], msg: Message):
topic, partition, offset = msg.topic(), msg.partition(), msg.offset()
if err is None:
self._tp_offsets[(topic, partition)] = offset
# Successful delivery confirms broker is reachable
self._producer._broker_available() # noqa: SLF001
else:
self._error = err

Expand Down
57 changes: 56 additions & 1 deletion quixstreams/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import logging
import time
import typing
from typing import Any, Callable, List, Optional, Tuple, Union, cast

Expand All @@ -20,7 +21,7 @@
)

from .configuration import ConnectionConfig
from .exceptions import KafkaConsumerException
from .exceptions import KafkaBrokerUnavailableError, KafkaConsumerException

__all__ = (
"BaseConsumer",
Expand Down Expand Up @@ -124,6 +125,13 @@ def __init__(
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)

self._broker_unavailable_since: Optional[float] = None

# Wrap the user-provided (or default) error callback so that broker
# availability tracking always runs, regardless of custom callbacks.
self._user_error_cb = error_callback
error_callback = self._error_cb

self._consumer_config = {
# previous Quix Streams defaults
"enable.auto.offset.store": False,
Expand All @@ -143,6 +151,53 @@ def __init__(
}
self._inner_consumer: Optional[ConfluentConsumer] = None

def _error_cb(self, error: KafkaError):
"""Instance-level error callback that tracks broker availability
and delegates to the user-provided (or default) error callback."""
error_code = error.code()
if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001
if self._broker_unavailable_since is None:
self._broker_unavailable_since = time.monotonic()
self._user_error_cb(error)

def _broker_available(self):
"""Reset the broker unavailability tracker."""
self._broker_unavailable_since = None

def raise_if_broker_unavailable(self, timeout: float):
"""Raise if all brokers have been unavailable for longer than ``timeout`` seconds.

Before raising, performs an active metadata probe to confirm the brokers
are truly unreachable (avoids false positives for idle applications).

:param timeout: seconds of continuous unavailability before raising.
:raises KafkaBrokerUnavailableError: if the timeout has been exceeded.
"""
if self._broker_unavailable_since is not None:
elapsed = time.monotonic() - self._broker_unavailable_since
if elapsed >= timeout:
try:
self._consumer.list_topics(timeout=5.0)
self._broker_unavailable_since = None
logger.info(
"Consumer broker availability probe succeeded after %.0fs; "
"resetting unavailability timer.",
elapsed,
)
return
except Exception:
logger.debug(
"Consumer broker availability probe failed", exc_info=True
)
raise KafkaBrokerUnavailableError(
f"All Kafka brokers have been unavailable for "
f"{elapsed:.0f}s (timeout={timeout:.0f}s). "
f"The application cannot recover automatically; "
f"restarting is required. "
f"Adjust via Application(broker_availability_timeout=...) "
f"or set to 0 to disable."
)

def poll(
self, timeout: Optional[float] = None
) -> Optional[RawConfluentKafkaMessageProto]:
Expand Down
3 changes: 3 additions & 0 deletions quixstreams/kafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ class KafkaProducerDeliveryError(BaseKafkaException): ...


class InvalidProducerConfigError(QuixException): ...


class KafkaBrokerUnavailableError(QuixException): ...
69 changes: 63 additions & 6 deletions quixstreams/kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import logging
import time
import uuid
from typing import Callable, List, Optional, Union

Expand All @@ -23,16 +24,13 @@
"PRODUCER_POLL_TIMEOUT",
)

from .exceptions import InvalidProducerConfigError
from .exceptions import InvalidProducerConfigError, KafkaBrokerUnavailableError

DeliveryCallback = Callable[[Optional[KafkaError], Message], None]

logger = logging.getLogger(__name__)

IGNORED_KAFKA_ERRORS = (
# This error seems to be thrown despite brokers being available.
# Seems linked to `connections.max.idle.ms`.
KafkaError._ALL_BROKERS_DOWN, # noqa: SLF001
_SILENTLY_IGNORED_KAFKA_ERRORS = (
# Broker handle destroyed - common/typical behavior, often seen via AdminClient
KafkaError._DESTROY, # noqa: SLF001
)
Expand All @@ -43,9 +41,12 @@

def _default_error_cb(error: KafkaError):
error_code = error.code()
if error_code in IGNORED_KAFKA_ERRORS:
if error_code in _SILENTLY_IGNORED_KAFKA_ERRORS:
logger.debug(error.str())
return
if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001
logger.warning(error.str())
return
logger.error(f'Kafka producer error: {error.str()} code="{error_code}"')


Expand Down Expand Up @@ -96,6 +97,13 @@ def __init__(
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)

self._broker_unavailable_since: Optional[float] = None

# Wrap the user-provided (or default) error callback so that broker
# availability tracking always runs, regardless of custom callbacks.
self._user_error_cb = error_callback
error_callback = self._error_cb

self._producer_config = {
# previous Quix Streams defaults
"partitioner": "murmur2",
Expand Down Expand Up @@ -235,6 +243,55 @@ def _producer(self) -> ConfluentProducer:
self._inner_producer.init_transactions()
return self._inner_producer

def _error_cb(self, error: KafkaError):
"""Instance-level error callback that tracks broker availability
and delegates to the user-provided (or default) error callback."""
error_code = error.code()
if error_code == KafkaError._ALL_BROKERS_DOWN: # noqa: SLF001
if self._broker_unavailable_since is None:
self._broker_unavailable_since = time.monotonic()
self._user_error_cb(error)

def _broker_available(self):
"""Reset the broker unavailability tracker."""
self._broker_unavailable_since = None

def raise_if_broker_unavailable(self, timeout: float):
"""Raise if all brokers have been unavailable for longer than ``timeout`` seconds.

Before raising, performs an active metadata probe to confirm the brokers
are truly unreachable (avoids false positives for idle applications).

:param timeout: seconds of continuous unavailability before raising.
:raises KafkaBrokerUnavailableError: if the timeout has been exceeded.
"""
if self._broker_unavailable_since is not None:
elapsed = time.monotonic() - self._broker_unavailable_since
if elapsed >= timeout:
# Active probe: try a lightweight metadata request before raising.
# This avoids false positives when brokers recovered but no
# messages flowed to reset the timer (idle apps).
try:
self._producer.list_topics(timeout=5.0)
# Probe succeeded — brokers are actually reachable.
self._broker_unavailable_since = None
logger.info(
"Broker availability probe succeeded after %.0fs; "
"resetting unavailability timer.",
elapsed,
)
return
except Exception:
logger.debug("Broker availability probe failed", exc_info=True)
raise KafkaBrokerUnavailableError(
f"All Kafka brokers have been unavailable for "
f"{elapsed:.0f}s (timeout={timeout:.0f}s). "
f"The application cannot recover automatically; "
f"restarting is required. "
f"Adjust via Application(broker_availability_timeout=...) "
f"or set to 0 to disable."
)

def __len__(self):
return len(self._producer)

Expand Down
5 changes: 5 additions & 0 deletions quixstreams/sources/base/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
producer: InternalProducer,
consumer: InternalConsumer,
topic_manager: TopicManager,
broker_availability_timeout: float = 0,
) -> None:
super().__init__()
self.topic = topic
Expand All @@ -58,6 +59,7 @@ def __init__(

self._consumer = consumer
self._producer = producer
self._broker_availability_timeout = broker_availability_timeout

# copy parent process log level to the child process
self._loglevel = logging.getLogger(LOGGER_NAME).level
Expand Down Expand Up @@ -130,6 +132,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition:
recovery_manager = RecoveryManager(
consumer=self._consumer,
topic_manager=self._topic_manager,
broker_availability_timeout=self._broker_availability_timeout,
)

state_manager = StateStoreManager(
Expand Down Expand Up @@ -264,6 +267,7 @@ def register(
producer,
consumer,
topic_manager,
broker_availability_timeout: float = 0,
) -> SourceProcess:
"""
Register a new source in the manager.
Expand All @@ -281,6 +285,7 @@ def register(
producer=producer,
consumer=consumer,
topic_manager=topic_manager,
broker_availability_timeout=broker_availability_timeout,
)
self.processes.append(process)
return process
Expand Down
13 changes: 12 additions & 1 deletion quixstreams/state/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,16 @@ class RecoveryManager:
# At 10-second progress logging intervals, 60 attempts = ~10 minutes
MAX_INVALID_OFFSET_ATTEMPTS = 60

def __init__(self, consumer: BaseConsumer, topic_manager: TopicManager):
def __init__(
self,
consumer: BaseConsumer,
topic_manager: TopicManager,
broker_availability_timeout: float = 0,
):
self._running = False
self._consumer = consumer
self._topic_manager = topic_manager
self._broker_availability_timeout = broker_availability_timeout
self._recovery_partitions: Dict[int, Dict[str, RecoveryPartition]] = {}
self._last_progress_logged_time = time.monotonic()
# Cache position results to avoid double calls in same iteration
Expand Down Expand Up @@ -608,6 +614,11 @@ def _recovery_loop(self) -> None:
msg = raise_for_msg_error(msg)
rp = self._recovery_partitions[msg.partition()][msg.topic()]
rp.recover_from_changelog_message(changelog_message=msg)
self._consumer._broker_available() # noqa: SLF001
if self._broker_availability_timeout:
self._consumer.raise_if_broker_unavailable(
self._broker_availability_timeout
)

def _get_position_with_cache(self, rp: RecoveryPartition) -> ConfluentPartition:
"""
Expand Down
Loading