Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions src/retriable_kafka_client/retry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import datetime
import logging
import re
from collections import defaultdict
from typing import TypeVar

from confluent_kafka import Message, KafkaException, TopicPartition

from .error import SendError
from .config import ProducerConfig, ConsumerConfig, ConsumeTopicConfig
from .headers import (
TIMESTAMP_HEADER,
Expand All @@ -36,6 +39,7 @@


LOGGER = logging.getLogger(__name__)
T = TypeVar("T")


def _get_retry_timestamp(message: Message) -> float | None:
Expand Down Expand Up @@ -76,6 +80,39 @@ def _get_current_timestamp() -> float:
return datetime.datetime.now(tz=datetime.timezone.utc).timestamp()


def _regex_lookup_and_update_in_dict(
input_key: str, lookup_dict: dict[str, T]
) -> T | None:
"""
Lookup a key in a dictionary. If not present, try matching
the input key with a dictionary key. If matched, update the
lookup dict and return the corresponding value.

This is useful for searching and updating lookup tables
for retry configs and producers.

Args:
input_key: Usually topic name as present in the received message,
the key we search for.
lookup_dict: Lookup dict, it's keys can be either
direct matches or regex patterns
Returns:
The item from the lookup dict, or None if no item was found.
"""
result = lookup_dict.get(input_key, None)
if result is not None:
return result
for dict_key, dict_value in lookup_dict.items():
try:
if re.match(dict_key, input_key):
# Store the lookup for next time
lookup_dict[input_key] = dict_value
return dict_value
except re.PatternError:
pass
return None


class RetryScheduleCache:
"""
Class for storing information about messages that are blocked
Expand Down Expand Up @@ -254,7 +291,9 @@ def _get_retry_headers(
message: Kafka message that will be retried
Returns: dictionary of retry headers used for next sending
"""
relevant_config = self.__topic_lookup.get(message.topic)
relevant_config = _regex_lookup_and_update_in_dict(
message.topic, self.__topic_lookup
)
if relevant_config is None:
return None
previous_attempt = _get_retry_attempt(message)
Expand All @@ -277,7 +316,9 @@ def resend_message(self, message: MessageGroup) -> None:
message: the Kafka message that failed to be processed
"""
message_topic = message.topic
relevant_producer = self.__retry_producers.get(message_topic)
relevant_producer = _regex_lookup_and_update_in_dict(
message_topic, self.__retry_producers
)
if relevant_producer is None:
LOGGER.debug(
"Message %s from topic %s does not have configured retry topic.",
Expand All @@ -287,7 +328,9 @@ def resend_message(self, message: MessageGroup) -> None:
return

# Check if we've exhausted retry attempts
relevant_config = self.__topic_lookup.get(message_topic)
relevant_config = _regex_lookup_and_update_in_dict(
message_topic, self.__topic_lookup
)
if relevant_config is not None:
current_attempt = _get_retry_attempt(message)
if current_attempt >= relevant_config.retries:
Expand All @@ -309,7 +352,7 @@ def resend_message(self, message: MessageGroup) -> None:
message_topic,
extra={"message_raw": str(message.all_chunks)},
)
except (TypeError, BufferError, KafkaException):
except (TypeError, BufferError, KafkaException, SendError):
LOGGER.exception(
"Cannot resend message from topic: %s to its retry topic %s",
message_topic,
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/integration_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def start_consumer(
max_concurrency: int = 4,
max_workers: int = 2,
filter_function: Callable[[Any], bool] | None = None,
override_topics: list[ConsumeTopicConfig] | None = None,
) -> ConsumerHandle:
"""
Create and start a consumer with the specified configuration.
Expand All @@ -348,6 +349,7 @@ def start_consumer(
max_concurrency: Consumer concurrency limit
max_workers: Thread pool size
filter_function: Filters messages based on the user-provided function
override_topics: If provided, override the topic config

Returns:
A ConsumerHandle that can be used to stop the consumer.
Expand All @@ -367,7 +369,7 @@ def start_consumer(
# Create consumer
consumer_config = ConsumerConfig(
kafka_hosts=[self.kafka_config[KafkaOptions.KAFKA_NODES]],
topics=self.config.topics,
topics=self.config.topics if override_topics is None else override_topics,
username=self.kafka_config[KafkaOptions.USERNAME],
password=self.kafka_config[KafkaOptions.PASSWORD],
group_id=self.config.group_id,
Expand Down
18 changes: 11 additions & 7 deletions tests/integration/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,24 @@ async def test_retry_mechanism_on_failure(
Test that when a message processing fails, it gets retried via the retry topic
and eventually succeeds.
"""
topic_config = ConsumeTopicConfig(
base_topic="test-base-topic",
retry_topic="test-retry-topic",
retries=3,
fallback_delay=1.0,
)
config = ScaffoldConfig(
topics=[
ConsumeTopicConfig(
base_topic="test-base-topic",
retry_topic="test-retry-topic",
retries=3,
fallback_delay=1.0,
),
topic_config,
],
group_id="test-retry-consumer-group",
)

async with IntegrationTestScaffold(kafka_config, admin_client, config) as scaffold:
scaffold.start_consumer(fail_chance_on_first=1.0)
topic_config.base_topic = "^[t]est-base-topic$"
scaffold.start_consumer(
fail_chance_on_first=1.0, override_topics=[topic_config]
)
await asyncio.sleep(2) # Wait for consumer to be ready

await scaffold.send_messages(1)
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/test_retry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def test__get_retry_headers_no_config() -> None:
"topics,message_topic,send_error,expect_send",
[
pytest.param(
[ConsumeTopicConfig(base_topic="t", retry_topic="t-retry")],
[ConsumeTopicConfig(base_topic="^t$", retry_topic="t-retry")],
"t",
None,
True,
Expand All @@ -497,6 +497,16 @@ def test__get_retry_headers_no_config() -> None:
True,
id="exception_logged",
),
pytest.param(
[
ConsumeTopicConfig(base_topic="t[", retry_topic="t-retry"),
ConsumeTopicConfig(base_topic="^u$", retry_topic="u-retry"),
],
"u",
None,
True,
id="regex_matching",
),
],
)
def test_resend_message(topics, message_topic, send_error, expect_send) -> None:
Expand Down
Loading