Skip to content

Commit e8b0683

Browse files
author
DanielePalaia
committed
reconnecting managements, publishers and consumers
1 parent 11bed13 commit e8b0683

File tree

7 files changed

+107
-42
lines changed

7 files changed

+107
-42
lines changed

examples/reconnection/reconnection_example.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def main() -> None:
129129
publisher.publish(Message(body="test"))
130130
except ConnectionClosed:
131131
print("publisher closing exception, resubmitting")
132-
publisher = connection.publisher(addr)
132+
# publisher = connection.publisher(addr)
133133
continue
134134

135135
print("closing publisher")
@@ -153,9 +153,6 @@ def main() -> None:
153153
except KeyboardInterrupt:
154154
pass
155155
except ConnectionClosed:
156-
consumer = connection.consumer(
157-
addr_queue, message_handler=MyMessageHandler()
158-
)
159156
continue
160157
except Exception as e:
161158
print("consumer exited for exception " + str(e))
@@ -165,7 +162,6 @@ def main() -> None:
165162
print("cleanup")
166163
consumer.close()
167164

168-
management = connection.management()
169165
print("unbind")
170166
management.unbind(bind_name)
171167

rabbitmq_amqp_python_client/connection.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262
uri: Single node connection URI
6363
uris: List of URIs for multi-node setup
6464
ssl_context: SSL configuration for secure connections
65-
on_disconnection_handler: Callback for handling disconnection events
65+
reconnect: Ability to automatically reconnect in case of disconnections from the server
6666
6767
Raises:
6868
ValueError: If neither uri nor uris is provided
@@ -76,10 +76,10 @@ def __init__(
7676
self._addr: Optional[str] = uri
7777
self._addrs: Optional[list[str]] = uris
7878
self._conn: BlockingConnection
79-
self._management: Management
8079
self._conf_ssl_context: Union[
8180
PosixSslConfigurationContext, WinSslConfigurationContext, None
8281
] = ssl_context
82+
self._managements: list[Management] = []
8383
self._reconnect = reconnect
8484
self._ssl_domain = None
8585
self._connections = [] # type: ignore
@@ -158,7 +158,7 @@ def dial(self) -> None:
158158
on_disconnection_handler=self._on_disconnection,
159159
)
160160

161-
self._open()
161+
# self._open()
162162
logger.debug("Connection to the server established")
163163

164164
def _win_store_to_cert(
@@ -185,7 +185,12 @@ def management(self) -> Management:
185185
Returns:
186186
Management: The management interface for performing administrative tasks
187187
"""
188-
return self._management
188+
if len(self._managements) == 0:
189+
management = Management(self._conn)
190+
management.open()
191+
self._managements.append(management)
192+
193+
return self._managements[0]
189194

190195
# closes the connection to the AMQP 1.0 server.
191196
def close(self) -> None:
@@ -196,9 +201,9 @@ def close(self) -> None:
196201
"""
197202
logger.debug("Closing connection")
198203
try:
199-
for publisher in self._publishers:
204+
for publisher in self._publishers[:]:
200205
publisher.close()
201-
for consumer in self._consumers:
206+
for consumer in self._consumers[:]:
202207
consumer.close()
203208
self._conn.close()
204209
except Exception as e:
@@ -228,8 +233,9 @@ def publisher(self, destination: str = "") -> Publisher:
228233
"destination address must start with /queues or /exchanges"
229234
)
230235
publisher = Publisher(self._conn, destination)
236+
publisher._set_publishers_list(self._publishers)
231237
self._publishers.append(publisher)
232-
return self._publishers[self._publishers.index(publisher)]
238+
return publisher
233239

234240
def consumer(
235241
self,
@@ -265,26 +271,38 @@ def consumer(
265271

266272
def _on_disconnection(self) -> None:
267273

268-
print("disconnected")
269-
270274
if self in self._connections:
271275
self._connections.remove(self)
272276

273-
print("reconnecting")
274277
self._conn = BlockingConnection(
275278
url=self._addr,
276279
urls=self._addrs,
277280
ssl_domain=self._ssl_domain,
278281
on_disconnection_handler=self._on_disconnection,
279282
)
280-
self._open()
283+
281284
self._connections.append(self)
282285

283-
for index, publisher in enumerate(self._publishers):
284-
# publisher = self._publishers.pop(index)
285-
# address = publisher.address
286-
self._publishers.remove(publisher)
287-
# self._publishers.insert(index, Publisher(self._conn, address))
286+
for i, management in enumerate(self._managements):
287+
# Update the broken connection and sender in the management
288+
self._managements[i]._update_connection(self._conn)
289+
290+
for i, publisher in enumerate(self._publishers):
291+
# Update the broken connection and sender in the publisher
292+
self._publishers[i]._update_connection(self._conn)
288293

289294
for i, consumer in enumerate(self._consumers):
290-
self._consumers.remove(consumer)
295+
# Update the broken connection and sender in the consumer
296+
self._consumers[i]._update_connection(self._conn)
297+
298+
@property
299+
def active_producers(self) -> int:
300+
"""Returns the number of active publishers"""
301+
return len(self._publishers)
302+
303+
@property
304+
def active_consumers(self) -> int:
305+
"""Returns the number of active consumers"""
306+
return len(self._consumers)
307+
308+

rabbitmq_amqp_python_client/consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def _open(self) -> None:
6565
logger.debug("Creating Sender")
6666
self._receiver = self._create_receiver(self._addr)
6767

68+
def _update_connection(self, conn: BlockingConnection) -> None:
69+
self._conn = conn
70+
self._receiver = self._create_receiver(self._addr)
71+
6872
def _set_consumers_list(self, consumers: []) -> None: # type: ignore
6973
self._consumers = consumers
7074

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ def __init__(self, conn: BlockingConnection):
5353
self._receiver: Optional[BlockingReceiver] = None
5454
self._conn = conn
5555

56+
def _update_connection(self, conn: BlockingConnection) -> None:
57+
self._conn = conn
58+
self._sender = self._create_sender(CommonValues.management_node_address.value)
59+
self._receiver = self._create_receiver(
60+
CommonValues.management_node_address.value,
61+
)
62+
5663
def open(self) -> None:
5764
"""
5865
Open the management connection by creating sender and receiver.

rabbitmq_amqp_python_client/publisher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def __init__(self, conn: BlockingConnection, addr: str = ""):
4646
self._publishers: list[Publisher] = []
4747
self._open()
4848

49+
def _update_connection(self, conn: BlockingConnection) -> None:
50+
self._conn = conn
51+
self._sender = self._create_sender(self._addr)
52+
4953
def _open(self) -> None:
5054
if self._sender is None:
5155
logger.debug("Creating Sender")

tests/test_connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ def test_connection_reconnection() -> None:
8181
# delay
8282
time.sleep(5)
8383
# simulate a disconnection
84-
delete_all_connections()
8584
# raise a reconnection
8685
management = connection.management()
86+
87+
delete_all_connections()
88+
8789
stream_name = "test_stream_info_with_validation"
8890
queue_specification = StreamSpecification(
8991
name=stream_name,
@@ -95,7 +97,6 @@ def test_connection_reconnection() -> None:
9597
disconnected = True
9698

9799
# check that we reconnected
98-
management = connection.management()
99100
management.declare_queue(queue_specification)
100101
management.delete_queue(stream_name)
101102
management.close()

tests/test_publisher.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,11 @@ def test_disconnection_reconnection() -> None:
270270
# delay
271271
time.sleep(5)
272272
messages_to_publish = 10000
273-
queue_name = "test-queue"
273+
queue_name = "test-queue-reconnection"
274274
management = connection_test.management()
275275

276276
management.declare_queue(QuorumQueueSpecification(name=queue_name))
277277

278-
management.close()
279-
280278
publisher = connection_test.publisher(
281279
destination=AddressHelper.queue_address(queue_name)
282280
)
@@ -291,9 +289,6 @@ def test_disconnection_reconnection() -> None:
291289

292290
except ConnectionClosed:
293291
disconnected = True
294-
publisher = connection_test.publisher(
295-
destination=AddressHelper.queue_address(queue_name)
296-
)
297292
continue
298293

299294
except Exception:
@@ -303,14 +298,6 @@ def test_disconnection_reconnection() -> None:
303298

304299
publisher.close()
305300

306-
# cleanup, we need to create a new connection as the previous one
307-
# was closed by the test
308-
309-
connection_test = environment.connection()
310-
connection_test.dial()
311-
312-
management = connection_test.management()
313-
314301
# purge the queue and check number of published messages
315302
message_purged = management.purge_queue(queue_name)
316303

@@ -335,14 +322,10 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
335322
management = connection.management()
336323
management.declare_queue(queue_specification)
337324

338-
print("before creating publisher")
339-
340325
publisher = connection.publisher(
341326
destination=AddressHelper.queue_address(stream_name)
342327
)
343328

344-
print("after creating publisher")
345-
346329
for i in range(messages_to_send):
347330

348331
publisher.publish(Message(body="test"))
@@ -392,3 +375,55 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
392375
assert accepted_2 is True
393376
assert purged_messages_queue == 2
394377
assert raised is False
378+
379+
380+
def test_multiple_publishers(environment: Environment) -> None:
381+
382+
stream_name = "test_multiple_publisher_1"
383+
stream_name_2 = "test_multiple_publisher_2"
384+
connection = environment.connection()
385+
connection.dial()
386+
387+
stream_specification = StreamSpecification(
388+
name=stream_name,
389+
)
390+
management = connection.management()
391+
management.declare_queue(stream_specification)
392+
393+
stream_specification = StreamSpecification(
394+
name=stream_name_2,
395+
)
396+
management.declare_queue(stream_specification)
397+
398+
destination = AddressHelper.queue_address(stream_name)
399+
destination_2 = AddressHelper.queue_address(stream_name_2)
400+
connection.publisher(destination)
401+
402+
assert connection.active_producers == 1
403+
404+
publisher_2 = connection.publisher(destination_2)
405+
406+
assert connection.active_producers == 2
407+
408+
publisher_2.close()
409+
410+
assert connection.active_producers == 1
411+
412+
connection.publisher(destination_2)
413+
414+
assert connection.active_producers == 2
415+
416+
connection.close()
417+
418+
assert connection.active_producers == 0
419+
420+
# cleanup
421+
connection = environment.connection()
422+
connection.dial()
423+
management = connection.management()
424+
425+
management.delete_queue(stream_name)
426+
427+
management.delete_queue(stream_name_2)
428+
429+
management.close()

0 commit comments

Comments
 (0)