-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathretry_utils.py
More file actions
361 lines (324 loc) · 13.2 KB
/
retry_utils.py
File metadata and controls
361 lines (324 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""
Module for managing retries in message consumption.
Each consumed topic can have settings for a retry topic.
Retry topic is a separate topic for each normal topic. If message
processing fails, the consumer commits the original message and
sends its contents to the retry topic. It also adds headers to
this message, specifying number of attempts and the timestamp,
after which the message can be retried.
The consumer consumes messages from all specified topics, (both
the original topics and retry topics) and if it encounters a message
that should not be processed yet due to its timestamp, it stores it
in memory and pauses the consumption of the partition where this message
originated. After its timestamp passes, it resumes the consumption
of this partition and sends the message for further processing and
committing.
"""
import datetime
import logging
import re
from collections import defaultdict
from typing import TypeVar
from confluent_kafka import Message, KafkaException, TopicPartition
from .error import SendError
from .config import ProducerConfig, ConsumerConfig, ConsumeTopicConfig
from .headers import (
TIMESTAMP_HEADER,
ATTEMPT_HEADER,
serialize_number_to_bytes,
deserialize_number_from_bytes,
get_header_value,
)
from .kafka_utils import MessageGroup
from .producer import BaseProducer
LOGGER = logging.getLogger(__name__)
T = TypeVar("T")
def _get_retry_timestamp(message: Message) -> float | None:
"""
Retrieves the timestamp (specifying when the message should be processed)
from the message's header.
Args:
message: Kafka message object
Returns:
the timestamp in POSIX format or None if no timestamp was found
"""
header_val = get_header_value(message, TIMESTAMP_HEADER)
if header_val is not None:
return deserialize_number_from_bytes(header_val)
return None
def _get_retry_attempt(message_group: MessageGroup) -> int:
"""
Retrieves the attempt number from the message's header.
Args:
message_group: Kafka message group object
Returns:
the number of attempt or 0 if no attempt header was found
"""
header_val = get_header_value(message_group, ATTEMPT_HEADER)
if header_val is not None:
return deserialize_number_from_bytes(header_val)
return 0
def _get_current_timestamp() -> float:
"""
Returns the current timestamp in POSIX format
Returns:
the current timestamp in POSIX format
"""
return datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
def _regex_lookup_and_update_in_dict(
input_key: str, lookup_dict: dict[str, T]
) -> T | None:
"""
Lookup a key in a dictionary. If not present, try matching
the input key with a dictionary key. If matched, update the
lookup dict and return the corresponding value.
This is useful for searching and updating lookup tables
for retry configs and producers.
Args:
input_key: Usually topic name as present in the received message,
the key we search for.
lookup_dict: Lookup dict, it's keys can be either
direct matches or regex patterns
Returns:
The item from the lookup dict, or None if no item was found.
"""
result = lookup_dict.get(input_key, None)
if result is not None:
return result
for dict_key, dict_value in lookup_dict.items():
try:
if re.match(dict_key, input_key):
# Store the lookup for next time
lookup_dict[input_key] = dict_value
return dict_value
except re.PatternError:
pass
return None
class RetryScheduleCache:
"""
Class for storing information about messages that are blocked
from execution due to their header timestamp. Each message with this header
is scheduled for execution only after the specified timestamp passes.
"""
def __init__(self):
self.__schedule: dict[float, list[Message]] = defaultdict(list)
def pop_reprocessable(self) -> list[Message]:
"""
Return and delete the messages whose timestamp allows reprocessing
Returns:
list of Kafka message objects to be processed
"""
result = []
current_timestamp = _get_current_timestamp()
to_remove = set()
for message_timestamp in sorted(self.__schedule.keys()):
message_list = self.__schedule[message_timestamp]
if message_timestamp <= current_timestamp:
to_remove.add(message_timestamp)
result.extend(message_list)
for resolved_item in to_remove:
self.__schedule.pop(resolved_item)
return result
def add_if_applicable(self, message: Message) -> bool:
"""
Only add the message if it is currently blocked. If this
method returns True, the message should not be processed
further at this point and should be aborted instead.
Further processing should then be checked with pop_reprocessable().
Only when the message is popped, it can be reprocessed.
Args:
message: Kafka message object
Returns: True if the message should be scheduled, False
if the message can be processed already.
"""
retry_timestamp = _get_retry_timestamp(message)
if retry_timestamp is None:
LOGGER.debug(
"Message from topic %s was not configured to wait in a schedule",
message.topic(),
extra={"message_raw": str(message.value())},
)
return False
if _get_current_timestamp() > retry_timestamp:
# Ready for processing already
return False
LOGGER.debug(
"Message from topic %s scheduled for processing at timestamp %s",
message.topic(),
retry_timestamp,
extra={"message_raw": str(message.value())},
)
self.__schedule.setdefault(retry_timestamp, []).append(message)
return True
def _cleanup(self) -> None:
"""
Clean up empty keys in the schedule (messages were deleted,
but the dictionary key could remain).
"""
keys_to_remove = set()
for timestamp, messages in self.__schedule.items():
if not messages:
keys_to_remove.add(timestamp)
for timestamp in keys_to_remove:
self.__schedule.pop(timestamp)
def register_revoke(self, partitions: list[TopicPartition]) -> None:
"""
Handle revocation of partitions. This happens during
cluster rebalancing.
Args:
partitions: list of partitions that are revoked
Returns: Nothing
"""
# Create a set of revoked partition information
# in the form of tuple with topic name and partition
# number for fast lookup
fast_lookup_partition_info = {
(partition.topic, partition.partition) for partition in partitions
}
for timestamp, messages in self.__schedule.items():
indexes_to_remove: list[int] = []
for idx, message in enumerate(messages):
if (
message.topic(),
message.partition(),
) not in fast_lookup_partition_info:
# Message is not in one of the revoked partitions
continue
LOGGER.info(
"Message from topic %s scheduled for processing at "
"timestamp %s will be discarded without committing "
"due to rebalancing. The next assigned consumer shall "
"retry reprocessing it.",
message.topic(),
timestamp,
)
indexes_to_remove.insert(0, idx)
for idx in indexes_to_remove:
# Delete message from schedule, this is in a reverse
# order, ensured by inserting to index 0 instead of
# appending
messages.pop(idx)
self._cleanup()
class RetryManager:
"""
This class re-sends failed messages to a specified retry topic.
"""
# pylint: disable=too-few-public-methods
def __validate(self) -> None:
"""
Check if the retry configuration is valid.
Raises: AssertionError if the retry configuration is invalid
"""
retry_topic_names = {
retry_config.retry_topic
for retry_config in self.__config.topics
if retry_config.retry_topic is not None
}
assert len(retry_topic_names) == len(
[
retry_config.retry_topic
for retry_config in self.__config.topics
if retry_config.retry_topic is not None
]
), "Two consumed topics cannot share the same retry topic!"
def __init__(self, config: ConsumerConfig) -> None:
self.__config = config
self.__validate()
self.__retry_producers: dict[str, BaseProducer] = {}
self.__topic_lookup: dict[str, ConsumeTopicConfig] = {}
self.__populate_topics_and_producers()
def __populate_topics_and_producers(self) -> None:
"""
Help initiate the manager without polluting the constructor.
"""
for topic_config in self.__config.topics:
if topic_config.retry_topic is None:
continue
# Assign topic lookup
self.__topic_lookup[topic_config.retry_topic] = topic_config
self.__topic_lookup[topic_config.base_topic] = topic_config
# Assign producer lookup
producer_config = ProducerConfig(
topics=[topic_config.retry_topic],
kafka_hosts=self.__config.kafka_hosts,
username=self.__config.username,
password=self.__config.password,
additional_settings=self.__config.additional_settings,
split_messages=True,
)
producer = BaseProducer(config=producer_config)
self.__retry_producers[topic_config.retry_topic] = producer
self.__retry_producers[topic_config.base_topic] = producer
def _get_retry_headers(
self, message: MessageGroup
) -> dict[str, str | bytes] | None:
"""
Create a dictionary of retry headers that will be used for the retried mechanism.
The headers are generated based on the headers from the previous message.
Args:
message: Kafka message that will be retried
Returns: dictionary of retry headers used for next sending
"""
relevant_config = _regex_lookup_and_update_in_dict(
message.topic, self.__topic_lookup
)
if relevant_config is None:
return None
previous_attempt = _get_retry_attempt(message)
retry_timestamp = _get_current_timestamp() + relevant_config.fallback_delay
return {
ATTEMPT_HEADER: serialize_number_to_bytes(previous_attempt + 1),
TIMESTAMP_HEADER: serialize_number_to_bytes(retry_timestamp),
}
def resend_message(self, message: MessageGroup) -> None:
"""
Send the message's copy to the specified retry topic.
Also update its headers so that it will only be retried after
the specified amount of time. Skips empty messages.
If the message has already exhausted its retry attempts, it will
not be resent and will be logged as a warning.
Args:
message: the Kafka message that failed to be processed
"""
message_topic = message.topic
relevant_producer = _regex_lookup_and_update_in_dict(
message_topic, self.__retry_producers
)
if relevant_producer is None:
LOGGER.debug(
"Message %s from topic %s does not have configured retry topic.",
message,
message_topic,
)
return
# Check if we've exhausted retry attempts
relevant_config = _regex_lookup_and_update_in_dict(
message_topic, self.__topic_lookup
)
if relevant_config is not None:
current_attempt = _get_retry_attempt(message)
if current_attempt >= relevant_config.retries:
LOGGER.warning(
"Message from topic %s exhausted all %d retry attempts. "
"Message will not be retried.",
message_topic,
relevant_config.retries,
extra={"message_raw": str(message.all_chunks)},
)
return
try:
relevant_producer.send_sync(
message.all_chunks, headers=self._get_retry_headers(message)
)
LOGGER.debug(
"Message from topic sent for reprocessing, %s",
message_topic,
extra={"message_raw": str(message.all_chunks)},
)
except (TypeError, BufferError, KafkaException, SendError):
LOGGER.exception(
"Cannot resend message from topic: %s to its retry topic %s",
message_topic,
relevant_producer.topics,
extra={"message_raw": str(message.all_chunks)},
)