Skip to content

Commit ceb7695

Browse files
fix: retry lookup supports regex
* handle sendErrors Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
1 parent be92708 commit ceb7695

File tree

4 files changed

+72
-13
lines changed

4 files changed

+72
-13
lines changed

src/retriable_kafka_client/retry_utils.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
import datetime
2121
import logging
22+
import re
2223
from collections import defaultdict
24+
from typing import TypeVar
2325

2426
from confluent_kafka import Message, KafkaException, TopicPartition
2527

28+
from .error import SendError
2629
from .config import ProducerConfig, ConsumerConfig, ConsumeTopicConfig
2730
from .headers import (
2831
TIMESTAMP_HEADER,
@@ -36,6 +39,7 @@
3639

3740

3841
LOGGER = logging.getLogger(__name__)
42+
T = TypeVar("T")
3943

4044

4145
def _get_retry_timestamp(message: Message) -> float | None:
@@ -76,6 +80,39 @@ def _get_current_timestamp() -> float:
7680
return datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
7781

7882

83+
def _regex_lookup_and_update_in_dict(
84+
input_key: str, lookup_dict: dict[str, T]
85+
) -> T | None:
86+
"""
87+
Lookup a key in a dictionary. If not present, try matching
88+
the input key with a dictionary key. If matched, update the
89+
lookup dict and return the corresponding value.
90+
91+
This is useful for searching and updating lookup tables
92+
for retry configs and producers.
93+
94+
Args:
95+
input_key: Usually topic name as present in the received message,
96+
the key we search for.
97+
lookup_dict: Lookup dict, it's keys can be either
98+
direct matches or regex patterns
99+
Returns:
100+
The item from the lookup dict, or None if no item was found.
101+
"""
102+
result = lookup_dict.get(input_key, None)
103+
if result is not None:
104+
return result
105+
for dict_key, dict_value in lookup_dict.items():
106+
try:
107+
if re.match(dict_key, input_key):
108+
# Store the lookup for next time
109+
lookup_dict[input_key] = dict_value
110+
return dict_value
111+
except re.PatternError:
112+
pass
113+
return None
114+
115+
79116
class RetryScheduleCache:
80117
"""
81118
Class for storing information about messages that are blocked
@@ -254,7 +291,9 @@ def _get_retry_headers(
254291
message: Kafka message that will be retried
255292
Returns: dictionary of retry headers used for next sending
256293
"""
257-
relevant_config = self.__topic_lookup.get(message.topic)
294+
relevant_config = _regex_lookup_and_update_in_dict(
295+
message.topic, self.__topic_lookup
296+
)
258297
if relevant_config is None:
259298
return None
260299
previous_attempt = _get_retry_attempt(message)
@@ -277,7 +316,9 @@ def resend_message(self, message: MessageGroup) -> None:
277316
message: the Kafka message that failed to be processed
278317
"""
279318
message_topic = message.topic
280-
relevant_producer = self.__retry_producers.get(message_topic)
319+
relevant_producer = _regex_lookup_and_update_in_dict(
320+
message_topic, self.__retry_producers
321+
)
281322
if relevant_producer is None:
282323
LOGGER.debug(
283324
"Message %s from topic %s does not have configured retry topic.",
@@ -287,7 +328,9 @@ def resend_message(self, message: MessageGroup) -> None:
287328
return
288329

289330
# Check if we've exhausted retry attempts
290-
relevant_config = self.__topic_lookup.get(message_topic)
331+
relevant_config = _regex_lookup_and_update_in_dict(
332+
message_topic, self.__topic_lookup
333+
)
291334
if relevant_config is not None:
292335
current_attempt = _get_retry_attempt(message)
293336
if current_attempt >= relevant_config.retries:
@@ -309,7 +352,7 @@ def resend_message(self, message: MessageGroup) -> None:
309352
message_topic,
310353
extra={"message_raw": str(message.all_chunks)},
311354
)
312-
except (TypeError, BufferError, KafkaException):
355+
except (TypeError, BufferError, KafkaException, SendError):
313356
LOGGER.exception(
314357
"Cannot resend message from topic: %s to its retry topic %s",
315358
message_topic,

tests/integration/integration_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ def start_consumer(
337337
max_concurrency: int = 4,
338338
max_workers: int = 2,
339339
filter_function: Callable[[Any], bool] | None = None,
340+
override_topics: list[ConsumeTopicConfig] | None = None,
340341
) -> ConsumerHandle:
341342
"""
342343
Create and start a consumer with the specified configuration.
@@ -348,6 +349,7 @@ def start_consumer(
348349
max_concurrency: Consumer concurrency limit
349350
max_workers: Thread pool size
350351
filter_function: Filters messages based on the user-provided function
352+
override_topics: If provided, override the topic config
351353
352354
Returns:
353355
A ConsumerHandle that can be used to stop the consumer.
@@ -367,7 +369,7 @@ def start_consumer(
367369
# Create consumer
368370
consumer_config = ConsumerConfig(
369371
kafka_hosts=[self.kafka_config[KafkaOptions.KAFKA_NODES]],
370-
topics=self.config.topics,
372+
topics=self.config.topics if override_topics is None else override_topics,
371373
username=self.kafka_config[KafkaOptions.USERNAME],
372374
password=self.kafka_config[KafkaOptions.PASSWORD],
373375
group_id=self.config.group_id,

tests/integration/test_retry.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,24 @@ async def test_retry_mechanism_on_failure(
2828
Test that when a message processing fails, it gets retried via the retry topic
2929
and eventually succeeds.
3030
"""
31+
topic_config = ConsumeTopicConfig(
32+
base_topic="test-base-topic",
33+
retry_topic="test-retry-topic",
34+
retries=3,
35+
fallback_delay=1.0,
36+
)
3137
config = ScaffoldConfig(
3238
topics=[
33-
ConsumeTopicConfig(
34-
base_topic="test-base-topic",
35-
retry_topic="test-retry-topic",
36-
retries=3,
37-
fallback_delay=1.0,
38-
),
39+
topic_config,
3940
],
4041
group_id="test-retry-consumer-group",
4142
)
4243

4344
async with IntegrationTestScaffold(kafka_config, admin_client, config) as scaffold:
44-
scaffold.start_consumer(fail_chance_on_first=1.0)
45+
topic_config.base_topic = "^[t]est-base-topic$"
46+
scaffold.start_consumer(
47+
fail_chance_on_first=1.0, override_topics=[topic_config]
48+
)
4549
await asyncio.sleep(2) # Wait for consumer to be ready
4650

4751
await scaffold.send_messages(1)

tests/unit/test_retry_utils.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ def test__get_retry_headers_no_config() -> None:
470470
"topics,message_topic,send_error,expect_send",
471471
[
472472
pytest.param(
473-
[ConsumeTopicConfig(base_topic="t", retry_topic="t-retry")],
473+
[ConsumeTopicConfig(base_topic="^t$", retry_topic="t-retry")],
474474
"t",
475475
None,
476476
True,
@@ -497,6 +497,16 @@ def test__get_retry_headers_no_config() -> None:
497497
True,
498498
id="exception_logged",
499499
),
500+
pytest.param(
501+
[
502+
ConsumeTopicConfig(base_topic="t[", retry_topic="t-retry"),
503+
ConsumeTopicConfig(base_topic="^u$", retry_topic="u-retry"),
504+
],
505+
"u",
506+
None,
507+
True,
508+
id="regex_matching",
509+
),
500510
],
501511
)
502512
def test_resend_message(topics, message_topic, send_error, expect_send) -> None:

0 commit comments

Comments
 (0)