File tree 5 files changed +20
-10
lines changed
rabbitmq_amqp_python_client
5 files changed +20
-10
lines changed Original file line number Diff line number Diff line change @@ -11,3 +11,6 @@ Same for Consumers.
11
11
12
12
In case of streams we connection will restart consuming from the last consumed offset.
13
13
14
+ You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify
15
+ the backoff interval and the maximum_retries before the client gives up.
16
+
Original file line number Diff line number Diff line change @@ -278,6 +278,7 @@ def consumer(
278
278
279
279
def _on_disconnection (self ) -> None :
280
280
281
+ logger .debug ("_on_disconnection: disconnection detected" )
281
282
if self in self ._connections :
282
283
self ._connections .remove (self )
283
284
@@ -286,6 +287,7 @@ def _on_disconnection(self) -> None:
286
287
287
288
for attempt in range (self ._recovery_configuration .MaxReconnectAttempts ): # type: ignore
288
289
290
+ logger .debug ("attempting a reconnection" )
289
291
jitter = timedelta (milliseconds = random .randint (0 , 500 ))
290
292
delay = base_delay + jitter
291
293
@@ -318,16 +320,24 @@ def _on_disconnection(self) -> None:
318
320
319
321
except ConnectionException as e :
320
322
base_delay *= 2
321
- logger .error (
323
+ logger .debug (
322
324
"Reconnection attempt failed" ,
323
325
"attempt" ,
324
326
attempt ,
325
327
"error" ,
326
328
str (e ),
327
329
)
328
- continue
330
+ # maximum attempts reached without establishing a connection
331
+ if attempt == self ._recovery_configuration .MaxReconnectAttempts - 1 : # type: ignore
332
+ logger .debug ("Not able to reconnect" )
333
+ raise ConnectionException
334
+ else :
335
+ continue
329
336
330
- break
337
+ # connection established
338
+ else :
339
+ logger .debug ("reconnected successful" )
340
+ return
331
341
332
342
@property
333
343
def active_producers (self ) -> int :
Original file line number Diff line number Diff line change @@ -231,5 +231,5 @@ class RecoveryConfiguration:
231
231
"""
232
232
233
233
active_recovery : bool = True
234
- back_off_reconnect_interval : timedelta = timedelta (0. 5 )
234
+ back_off_reconnect_interval : timedelta = timedelta (seconds = 5 )
235
235
MaxReconnectAttempts : int = 5
Original file line number Diff line number Diff line change 32
32
33
33
import proton
34
34
from proton import Sender as ProtonSender
35
- from proton .handlers import \
36
- IncomingMessageHandler as ProtonIncomingMessageHandler
37
- from proton .handlers import \
38
- OutgoingMessageHandler as ProtonOutgoingMessageHandler
35
+ from proton .handlers import IncomingMessageHandler as ProtonIncomingMessageHandler
36
+ from proton .handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler
39
37
40
38
_tracer = None
41
39
_trace_key = proton .symbol ("x-opt-qpid-tracestate" )
Original file line number Diff line number Diff line change 139
139
140
140
if TYPE_CHECKING :
141
141
from ._condition import Condition
142
- from ._endpoints import \
143
- Connection # would produce circular import
142
+ from ._endpoints import Connection # would produce circular import
144
143
145
144
146
145
class TraceAdapter :
You can’t perform that action at this time.
0 commit comments