-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconsumer.py
More file actions
320 lines (291 loc) · 12.4 KB
/
consumer.py
File metadata and controls
320 lines (291 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""Base Kafka Consumer module"""
import logging
import sys
from concurrent.futures import Executor, Future
from concurrent.futures.process import BrokenProcessPool
from typing import Any
from confluent_kafka import Consumer, Message, KafkaException, TopicPartition
from .health import perform_healthcheck_using_client
from .kafka_utils import message_to_partition, MessageGroup
from .kafka_settings import KafkaOptions, DEFAULT_CONSUMER_SETTINGS
from .consumer_tracking import TrackingManager
from .config import ConsumerConfig
from .retry_utils import (
RetryManager,
RetryScheduleCache,
)
LOGGER = logging.getLogger(__name__)
class BaseConsumer:
"""
Base class for consuming from Kafka topics in Python.
The class utilizes executor for concurrent task processing,
this executor can even be shared between multiple consumer classes
(intended usage).
To synchronize messages correctly, this class holds information about
messages that are being processed and messages that are waiting to be
committed. The messages may wait for commiting because some message
received before them may not be processed yet. And we always want
to commit only the latest processed message that is not preceded
by any non-processed (pending) message.
This consumer is also capable of using a retry mechanism, it uses
RetryManager to get the correct producer object and resend the message
to a retry topic. This message is marked with a special header
that is checked in each consume. The header contains a timestamp
and each message with this timestamp will only be consumed after
that timestamp passes. To track messages pending for processing
due to this timestamping, the class RetryScheduleCache is used.
Consumption is stopped for each partition where a blocking message
appears to spare local memory. Consumption is resumed when the blocking
timestamp passes.
"""
# pylint: disable=too-many-instance-attributes
def __validate(self) -> None:
base_topic_names = {
retry_config.base_topic for retry_config in self._config.topics
}
assert len(base_topic_names) == len(
[retry_config.base_topic for retry_config in self._config.topics]
), "Cannot consume twice from the same topic!"
def __init__(
self,
config: ConsumerConfig,
executor: Executor,
max_concurrency: int = 16,
):
"""
Initialize a consumer.
Args:
config: The configuration object
executor: The executor pool used by this consumer.
max_concurrency: The maximum number of messages that can be
submitted to the executor from this consumer at the same time.
"""
# Consumer configuration
self._config = config
# Validate the configuration
self.__validate()
# Executor for target tasks
self._executor = executor
# Used for Kafka connections
self.__consumer_object: Consumer | None = None
# Used for stopping the consumption
self.__stop_flag: bool = False
# Store information about offsets and tasks
self.__tracking_manager = TrackingManager(
max_concurrency,
config.cancel_future_wait_time,
self._config.max_chunk_reassembly_wait_time,
)
# Manage re-sending messages to retry topics
self.__retry_manager = RetryManager(config)
# Store information about pending retried messages
self.__schedule_cache = RetryScheduleCache()
@property
def _consumer(self) -> Consumer:
"""
Create the consumer object, keep it in memory.
Returns: Kafka consumer object.
"""
if not self.__consumer_object:
config_dict = {
KafkaOptions.KAFKA_NODES: ",".join(self._config.kafka_hosts),
KafkaOptions.USERNAME: self._config.username,
KafkaOptions.PASSWORD: self._config.password,
KafkaOptions.GROUP_ID: self._config.group_id,
**DEFAULT_CONSUMER_SETTINGS,
}
config_dict.update(self._config.additional_settings)
self.__consumer_object = Consumer(
config_dict,
)
return self.__consumer_object
### Offset-related methods ###
def __perform_commits(self) -> None:
"""
Commit anything that is awaiting to be committed.
"""
committable = self.__tracking_manager.pop_committable()
if committable:
try:
self._consumer.commit(offsets=committable, asynchronous=False)
except KafkaException:
LOGGER.exception(
"Temporarily failed to commit messages to partitions %s. "
"This action will be retried.",
committable,
)
self.__tracking_manager.reschedule_uncommittable(committable)
def __on_revoke(self, _: Consumer, partitions: list[TopicPartition]) -> None:
"""
Callback when partitions are revoked during rebalancing.
This is called in the same thread as poll, directly by the underlying
Kafka library.
"""
self.__schedule_cache.register_revoke(partitions)
self.__tracking_manager.register_revoke(partitions)
self.__perform_commits()
def __ack_message(self, message: MessageGroup, finished_future: Future) -> None:
"""
Private method only ever intended to be used from within
_process_message(). It commits offsets and releases
semaphore for processing next messages.
Args:
message: The Kafka message to be acknowledged
finished_future: The finished future which caused this call
"""
if finished_future.cancelled():
return
try:
if problem := finished_future.exception():
LOGGER.error(
"Message could not be processed! Message: %s.",
message.deserialize(),
exc_info=problem,
)
self.__retry_manager.resend_message(message)
finally:
self.__tracking_manager.schedule_commit(message, release_semaphore=True)
def __graceful_shutdown(self) -> None:
"""
Finish future execution, perform commits and
stop the Kafka consumer. This must be called from
the same thread that polls messages.
"""
self.__tracking_manager.register_revoke()
try:
self.__perform_commits()
finally:
LOGGER.debug("Shutting down Kafka consumer...")
try:
self._consumer.close()
except (RuntimeError, KafkaException):
LOGGER.debug("Consumer already closed.")
LOGGER.info("Kafka consumer has been shut down gracefully.")
### Retry methods ###
def __process_retried_messages_from_schedule(self) -> None:
"""
Perform all actions that are scheduled for processing and
their schedule has passed already
"""
reprocessable = self.__schedule_cache.pop_reprocessable()
for message in reprocessable:
LOGGER.debug(
"Retrying processing message from topic: %s",
message.topic(),
extra={"message_raw": str(message.value())},
)
self._process_message(message)
# Resume consumption from the topic, the latest message's schedule
# has passed
self._consumer.resume(
[message_to_partition(message) for message in reprocessable]
)
### Main processing function ###
def _process_message(self, message: Message) -> Future[Any] | None:
"""
Process this message with the specified target function
with usage of the executor.
Args:
message: Kafka message object. Only its value will be used
for deserialization and passing to the target function.
Returns: Future of the target execution if the message can be processed.
None otherwise.
"""
message_group = self.__tracking_manager.receive(message)
if not message_group:
return None
message_data = message_group.deserialize()
if not message_data:
# Semaphore was not acquired
self.__tracking_manager.schedule_commit(
message_group, release_semaphore=False
)
return None
future = self._executor.submit(self._config.target, message_data)
self.__tracking_manager.process_message(message_group, future)
# The semaphore is released within this callback
future.add_done_callback(lambda res: self.__ack_message(message_group, res))
return future
### Public methods ###
def run(self) -> None:
"""
Run the consumer. This starts consuming messages from kafka
and their processing within the process pool.
"""
plain_topics = [topic.base_topic for topic in self._config.topics]
retry_topics = [
topic.retry_topic
for topic in self._config.topics
if topic.retry_topic is not None
]
self._consumer.subscribe(
[*plain_topics, *retry_topics],
on_revoke=self.__on_revoke,
)
while not self.__stop_flag:
try:
# First resolve local messages waiting in schedule
self.__process_retried_messages_from_schedule()
# Then poll Kafka messages
msg = self._consumer.poll(1)
if msg is None:
LOGGER.debug(
"No message received currently.",
)
# This allows interrupting the script
# each second
continue
if error := msg.error():
LOGGER.debug("Consumer error: %s", error.str())
continue
# Message filtering based on the user-provided function
if self._config.filter_function is not None:
try:
if not self._config.filter_function(msg):
LOGGER.debug(
"Message was filtered out, topic %s",
msg.topic(),
)
continue
except Exception as filter_error: # pylint: disable=broad-exception-caught
LOGGER.exception(
"Filter raised an exception: %s.",
filter_error,
exc_info=True,
)
continue
current_offset = msg.offset()
partition = message_to_partition(msg)
LOGGER.debug("Consumer received message at offset: %s", current_offset)
if self.__schedule_cache.add_if_applicable(msg):
# Skip messages not eligible for execution yet, schedule them
self._consumer.pause([partition])
continue
self._process_message(msg)
self.__perform_commits()
# Calculate and log remaining messages for partition
_, high_offset = self._consumer.get_watermark_offsets(
partition, cached=True
)
if current_offset is not None:
messages_left = high_offset - current_offset - 1
LOGGER.info(
"%d message(s) left to process in %s (partition %d)",
messages_left,
msg.topic(),
msg.partition(),
)
except BrokenProcessPool:
LOGGER.exception("Process pool got broken, stopping consumer.")
self.__graceful_shutdown()
sys.exit(1)
self.__graceful_shutdown()
def connection_healthcheck(self) -> bool:
"""Programmatically check if we are able to read from Kafka."""
return perform_healthcheck_using_client(self._consumer)
def stop(self) -> None:
"""
Gracefully stop the consumer.
"""
LOGGER.debug("Stopping retriable consumer...")
self.__stop_flag = True