Skip to content

Commit c25c94a

Browse files
author
DanielePalaia
committed
address PR comments
1 parent dc85a1f commit c25c94a

File tree

4 files changed

+54
-45
lines changed

4 files changed

+54
-45
lines changed

examples/reconnection/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ In case of streams the connection will restart consuming from the last consumed
1414
You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify
1515
the backoff interval and the maximum_retries before the client gives up.
1616

17+
To disable automatic reconnection you can set active_recovery of RecoveryConfiguration to False
18+

examples/reconnection/reconnection_example.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
ExchangeToQueueBindingSpecification,
1111
Message,
1212
QuorumQueueSpecification,
13-
RecoveryConfiguration,
1413
)
1514

1615
# here we keep track of the objects we need to reconnect
@@ -19,7 +18,6 @@
1918

2019
environment = Environment(
2120
uri="amqp://guest:guest@localhost:5672/",
22-
recovery_configuration=RecoveryConfiguration(active_recovery=True),
2321
)
2422

2523

rabbitmq_amqp_python_client/connection.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
from .address_helper import validate_address
1616
from .consumer import Consumer
1717
from .entities import RecoveryConfiguration, StreamOptions
18-
from .exceptions import ArgumentOutOfRangeException
18+
from .exceptions import (
19+
ArgumentOutOfRangeException,
20+
ValidationCodeException,
21+
)
1922
from .management import Management
2023
from .publisher import Publisher
2124
from .qpid.proton._exceptions import ConnectionException
@@ -57,7 +60,7 @@ def __init__(
5760
ssl_context: Union[
5861
PosixSslConfigurationContext, WinSslConfigurationContext, None
5962
] = None,
60-
recovery_configuration: Optional[RecoveryConfiguration] = None,
63+
recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(),
6164
):
6265
"""
6366
Initialize a new Connection instance.
@@ -84,16 +87,55 @@ def __init__(
8487
PosixSslConfigurationContext, WinSslConfigurationContext, None
8588
] = ssl_context
8689
self._managements: list[Management] = []
87-
self._recovery_configuration = recovery_configuration
90+
self._recovery_configuration: RecoveryConfiguration = recovery_configuration
8891
self._ssl_domain = None
8992
self._connections = [] # type: ignore
9093
self._index: int = -1
9194
self._publishers: list[Publisher] = []
9295
self._consumers: list[Consumer] = []
9396

97+
# Some recovery_configuration validation
98+
if recovery_configuration.back_off_reconnect_interval < timedelta(seconds=1):
99+
raise ValidationCodeException(
100+
"back_off_reconnect_interval must be > 1 second"
101+
)
102+
103+
if recovery_configuration.MaxReconnectAttempts < 1:
104+
raise ValidationCodeException("MaxReconnectAttempts must be at least 1")
105+
94106
def _set_environment_connection_list(self, connections: []): # type: ignore
95107
self._connections = connections
96108

109+
def _open_connections(self, reconnect_handlers: bool = False) -> None:
110+
111+
if self._recovery_configuration.active_recovery is False:
112+
self._conn = BlockingConnection(
113+
url=self._addr,
114+
urls=self._addrs,
115+
ssl_domain=self._ssl_domain,
116+
)
117+
else:
118+
self._conn = BlockingConnection(
119+
url=self._addr,
120+
urls=self._addrs,
121+
ssl_domain=self._ssl_domain,
122+
on_disconnection_handler=self._on_disconnection,
123+
)
124+
125+
if reconnect_handlers is True:
126+
127+
for i, management in enumerate(self._managements):
128+
# Update the broken connection and sender in the management
129+
self._managements[i]._update_connection(self._conn)
130+
131+
for i, publisher in enumerate(self._publishers):
132+
# Update the broken connection and sender in the publisher
133+
self._publishers[i]._update_connection(self._conn)
134+
135+
for i, consumer in enumerate(self._consumers):
136+
# Update the broken connection and sender in the consumer
137+
self._consumers[i]._update_connection(self._conn)
138+
97139
def dial(self) -> None:
98140
"""
99141
Establish a connection to the AMQP server.
@@ -148,24 +190,7 @@ def dial(self) -> None:
148190
password,
149191
)
150192

151-
if (
152-
self._recovery_configuration is None
153-
or self._recovery_configuration.active_recovery is False
154-
):
155-
self._conn = BlockingConnection(
156-
url=self._addr,
157-
urls=self._addrs,
158-
ssl_domain=self._ssl_domain,
159-
)
160-
else:
161-
self._conn = BlockingConnection(
162-
url=self._addr,
163-
urls=self._addrs,
164-
ssl_domain=self._ssl_domain,
165-
on_disconnection_handler=self._on_disconnection,
166-
)
167-
168-
# self._open()
193+
self._open_connections()
169194
logger.debug("Connection to the server established")
170195

171196
def _win_store_to_cert(
@@ -282,10 +307,10 @@ def _on_disconnection(self) -> None:
282307
if self in self._connections:
283308
self._connections.remove(self)
284309

285-
base_delay = self._recovery_configuration.back_off_reconnect_interval # type: ignore
310+
base_delay = self._recovery_configuration.back_off_reconnect_interval
286311
max_delay = timedelta(minutes=1)
287312

288-
for attempt in range(self._recovery_configuration.MaxReconnectAttempts): # type: ignore
313+
for attempt in range(self._recovery_configuration.MaxReconnectAttempts):
289314

290315
logger.debug("attempting a reconnection")
291316
jitter = timedelta(milliseconds=random.randint(0, 500))
@@ -297,26 +322,10 @@ def _on_disconnection(self) -> None:
297322
time.sleep(delay.total_seconds())
298323

299324
try:
300-
self._conn = BlockingConnection(
301-
url=self._addr,
302-
urls=self._addrs,
303-
ssl_domain=self._ssl_domain,
304-
on_disconnection_handler=self._on_disconnection,
305-
)
306-
307-
self._connections.append(self)
308325

309-
for i, management in enumerate(self._managements):
310-
# Update the broken connection and sender in the management
311-
self._managements[i]._update_connection(self._conn)
326+
self._open_connections(reconnect_handlers=True)
312327

313-
for i, publisher in enumerate(self._publishers):
314-
# Update the broken connection and sender in the publisher
315-
self._publishers[i]._update_connection(self._conn)
316-
317-
for i, consumer in enumerate(self._consumers):
318-
# Update the broken connection and sender in the consumer
319-
self._consumers[i]._update_connection(self._conn)
328+
self._connections.append(self)
320329

321330
except ConnectionException as e:
322331
base_delay *= 2
@@ -328,7 +337,7 @@ def _on_disconnection(self) -> None:
328337
str(e),
329338
)
330339
# maximum attempts reached without establishing a connection
331-
if attempt == self._recovery_configuration.MaxReconnectAttempts - 1: # type: ignore
340+
if attempt == self._recovery_configuration.MaxReconnectAttempts - 1:
332341
logger.debug("Not able to reconnect")
333342
raise ConnectionException
334343
else:

rabbitmq_amqp_python_client/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def __init__(
4141
ssl_context: Union[
4242
PosixSslConfigurationContext, WinSslConfigurationContext, None
4343
] = None,
44-
recovery_configuration: Optional[RecoveryConfiguration] = None,
44+
recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(),
4545
):
4646
"""
4747
Initialize a new Environment instance.

0 commit comments

Comments
 (0)