-
Notifications
You must be signed in to change notification settings - Fork 99
Expand file tree
/
Copy pathinternal_producer.py
More file actions
335 lines (285 loc) · 12.3 KB
/
internal_producer.py
File metadata and controls
335 lines (285 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import logging
from functools import wraps
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from confluent_kafka import KafkaError, KafkaException, Message, TopicPartition
from confluent_kafka.admin import GroupMetadata
from quixstreams.exceptions import QuixException
from .error_callbacks import ProducerErrorCallback, default_on_producer_error
from .kafka.configuration import ConnectionConfig
from .kafka.exceptions import KafkaProducerDeliveryError
from .kafka.producer import (
PRODUCER_ON_ERROR_RETRIES,
PRODUCER_POLL_TIMEOUT,
Producer,
)
from .models import Headers, Row, Topic
logger = logging.getLogger(__name__)
_KEY_UNSET = object()
def _retriable_transaction_op(attempts: int, backoff_seconds: float):
"""
Some specific failure cases from sending offsets or committing a transaction
are retriable, which is worth re-attempting since the transaction is
almost complete (we flushed before attempting to commit).
Intended as a wrapper for these methods.
"""
def decorator(kafka_op: Callable):
@wraps(kafka_op)
def wrapper(*args, **kwargs):
attempts_remaining = attempts
op_name = kafka_op.__name__
while attempts_remaining:
try:
return kafka_op(*args, **kwargs)
except KafkaException as e:
error = e.args[0]
if error.retriable():
attempts_remaining -= 1
logger.debug(
f"Kafka transaction operation {op_name} failed, but "
f"can retry; attempts remaining: {attempts_remaining}. "
)
if attempts_remaining:
logger.debug(
f"Sleeping for {backoff_seconds}s before retrying."
)
sleep(backoff_seconds)
else:
# Just treat all errors besides retriable as fatal.
logger.error(
f"Error during Kafka transaction operation {op_name}"
)
raise
raise KafkaProducerTransactionCommitFailed(
f"All Kafka {op_name} attempts failed; "
"aborting transaction and shutting down Application..."
)
return wrapper
return decorator
class KafkaProducerTransactionCommitFailed(QuixException): ...
class InternalProducer:
"""
A producer class that is capable of serializing Rows to bytes and send them to Kafka.
The serialization is performed according to the Topic serialization settings.
:param broker_address: Connection settings for Kafka.
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
or a ConnectionConfig object if authentication is required.
:param extra_config: A dictionary with additional options that
will be passed to `confluent_kafka.Producer` as is.
Note: values passed as arguments override values in `extra_config`.
:param on_error: a callback triggered when `InternalProducer.produce_row()`
or `InternalProducer.poll()` fail`.
If producer fails and the callback returns `True`, the exception
will be logged but not propagated.
The default callback logs an exception and returns `False`.
:param flush_timeout: The time the producer is waiting for all messages to be delivered.
:param transactional: whether to use Kafka transactions or not.
Note this changes which underlying `Producer` class is used.
"""
def __init__(
self,
broker_address: Union[str, ConnectionConfig],
extra_config: Optional[dict] = None,
on_error: Optional[ProducerErrorCallback] = None,
flush_timeout: Optional[float] = None,
transactional: bool = False,
):
self._producer = Producer(
broker_address=broker_address,
extra_config=extra_config,
flush_timeout=flush_timeout,
transactional=transactional,
)
self._on_error: ProducerErrorCallback = on_error or default_on_producer_error
self._tp_offsets: Dict[Tuple[str, int], int] = {}
self._error: Optional[KafkaError] = None
self._active_transaction = False
def produce_row(
self,
row: Row,
topic: Topic,
key: Optional[Any] = _KEY_UNSET,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
):
"""
Serialize Row to bytes according to the Topic serialization settings
and produce it to Kafka
If this method fails, it will trigger the provided "on_error" callback.
:param row: Row object
:param topic: Topic object
:param key: message key, optional
:param partition: partition number, optional
:param timestamp: timestamp in milliseconds, optional
"""
try:
# Use existing key only if no other key is provided.
# If key is provided - use it, even if it's None
key = row.key if key is _KEY_UNSET else key
message = topic.row_serialize(row=row, key=key)
self.produce(
topic=topic.name,
key=message.key,
value=message.value,
headers=message.headers,
partition=partition,
timestamp=timestamp,
)
except Exception as exc:
to_suppress = self._on_error(exc, row, logger)
if to_suppress:
return
raise
def poll(self, timeout: float = 0):
"""
Polls the producer for events and calls `on_delivery` callbacks.
If `poll()` fails, it will trigger the provided "on_error" callback
:param timeout: timeout in seconds
"""
try:
self._producer.poll(timeout=timeout)
except Exception as exc:
to_suppress = self._on_error(exc, None, logger)
if to_suppress:
return
raise
def _broker_available(self):
"""Reset the broker unavailability tracker on the underlying Producer."""
self._producer._broker_available() # noqa: SLF001
def raise_if_broker_unavailable(self, timeout: float):
"""Raise if all brokers have been unavailable for longer than ``timeout`` seconds."""
self._producer.raise_if_broker_unavailable(timeout)
def produce(
self,
topic: str,
value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = PRODUCER_POLL_TIMEOUT,
buffer_error_max_tries: int = PRODUCER_ON_ERROR_RETRIES,
):
self._raise_for_error()
return self._producer.produce(
topic=topic,
value=value,
key=key,
headers=headers,
partition=partition,
timestamp=timestamp,
poll_timeout=poll_timeout,
buffer_error_max_tries=buffer_error_max_tries,
on_delivery=self._on_delivery,
)
def _on_delivery(self, err: Optional[KafkaError], msg: Message):
if self._error is not None:
# There's an error already set
return
topic, partition, offset = msg.topic(), msg.partition(), msg.offset()
if err is None:
self._tp_offsets[(topic, partition)] = offset
# Successful delivery confirms broker is reachable
self._producer._broker_available() # noqa: SLF001
else:
self._error = err
def _raise_for_error(self):
if self._error is not None:
exc = KafkaProducerDeliveryError(self._error)
self._error = None
raise exc
def flush(self, timeout: Optional[float] = None) -> int:
result = self._producer.flush(timeout=timeout)
self._raise_for_error()
return result
@property
def offsets(self) -> Dict[Tuple[str, int], int]:
return self._tp_offsets
def begin_transaction(self):
self._producer.begin_transaction()
self._active_transaction = True
def abort_transaction(self, timeout: Optional[float] = None):
"""
Attempt an abort if an active transaction.
Else, skip since it throws an exception if at least
one transaction was successfully completed at some point.
This avoids polluting the stack trace in the case where a transaction was
not active as expected (because of some other exception already raised)
and a cleanup abort is attempted.
NOTE: under normal circumstances a transaction will be open due to how
the Checkpoint inits another immediately after committing.
"""
if self._active_transaction:
if self._tp_offsets:
# Only log here to avoid polluting logging with empty checkpoint aborts
logger.debug("Aborting Kafka transaction and clearing producer offsets")
self._tp_offsets = {}
self._producer.abort_transaction(timeout)
self._active_transaction = False
else:
logger.debug("No Kafka transaction to abort")
def _retriable_op(self, attempts: int, backoff_seconds: float):
"""
Some specific failure cases from sending offsets or committing a transaction
are retriable, which is worth re-attempting since the transaction is
almost complete (we flushed before attempting to commit).
NOTE: During testing, most other operations (including producing)
did not generate "retriable" errors.
"""
def decorator(kafka_op: Callable):
@wraps(kafka_op)
def wrapper(*args, **kwargs):
attempts_remaining = attempts
op_name = kafka_op.__name__
while attempts_remaining:
try:
return kafka_op(*args, **kwargs)
except KafkaException as e:
error = e.args[0]
if error.retriable():
attempts_remaining -= 1
logger.debug(
f"Kafka transaction operation {op_name} failed, but "
f"can retry; attempts remaining: {attempts_remaining}. "
)
if attempts_remaining:
logger.debug(
f"Sleeping for {backoff_seconds}s before retrying."
)
sleep(backoff_seconds)
else:
# Just treat all errors besides retriable as fatal.
logger.error(
f"Error during Kafka transaction operation {op_name}"
)
raise
raise KafkaProducerTransactionCommitFailed(
f"All Kafka {op_name} attempts failed; "
"aborting transaction and shutting down Application..."
)
return wrapper
return decorator
@_retriable_transaction_op(attempts=3, backoff_seconds=1.0)
def _send_offsets_to_transaction(
self,
positions: List[TopicPartition],
group_metadata: GroupMetadata,
timeout: Optional[float] = None,
):
self._producer.send_offsets_to_transaction(positions, group_metadata, timeout)
@_retriable_transaction_op(attempts=3, backoff_seconds=1.0)
def _commit_transaction(self, timeout: Optional[float] = None):
self._producer.commit_transaction(timeout)
self._active_transaction = False
def commit_transaction(
self,
positions: List[TopicPartition],
group_metadata: GroupMetadata,
timeout: Optional[float] = None,
):
self._send_offsets_to_transaction(positions, group_metadata, timeout)
self._commit_transaction(timeout)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()