Skip to content

Commit 9b5d355

Browse files
author
DanielePalaia
committed
reconnecting managements, publishers and consumers
1 parent bca0fef commit 9b5d355

File tree

7 files changed

+109
-45
lines changed

7 files changed

+109
-45
lines changed

examples/reconnection/reconnection_example.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def main() -> None:
129129
publisher.publish(Message(body="test"))
130130
except ConnectionClosed:
131131
print("publisher closing exception, resubmitting")
132-
publisher = connection.publisher(addr)
132+
# publisher = connection.publisher(addr)
133133
continue
134134

135135
print("closing publisher")
@@ -153,9 +153,6 @@ def main() -> None:
153153
except KeyboardInterrupt:
154154
pass
155155
except ConnectionClosed:
156-
consumer = connection.consumer(
157-
addr_queue, message_handler=MyMessageHandler()
158-
)
159156
continue
160157
except Exception as e:
161158
print("consumer exited for exception " + str(e))
@@ -165,7 +162,6 @@ def main() -> None:
165162
print("cleanup")
166163
consumer.close()
167164

168-
management = connection.management()
169165
print("unbind")
170166
management.unbind(bind_name)
171167

rabbitmq_amqp_python_client/connection.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
self._addr: Optional[str] = uri
5959
self._addrs: Optional[list[str]] = uris
6060
self._conn: BlockingConnection
61-
self._management: Management
61+
self._managements: list[Management] = []
6262
self._reconnect = reconnect
6363
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
6464
self._ssl_domain = None
@@ -108,21 +108,22 @@ def dial(self) -> None:
108108
on_disconnection_handler=self._on_disconnection,
109109
)
110110

111-
self._open()
111+
# self._open()
112112
logger.debug("Connection to the server established")
113113

114-
def _open(self) -> None:
115-
self._management = Management(self._conn)
116-
self._management.open()
117-
118114
def management(self) -> Management:
119115
"""
120116
Get the management interface for this connection.
121117
122118
Returns:
123119
Management: The management interface for performing administrative tasks
124120
"""
125-
return self._management
121+
if len(self._managements) == 0:
122+
management = Management(self._conn)
123+
management.open()
124+
self._managements.append(management)
125+
126+
return self._managements[0]
126127

127128
# closes the connection to the AMQP 1.0 server.
128129
def close(self) -> None:
@@ -133,9 +134,9 @@ def close(self) -> None:
133134
"""
134135
logger.debug("Closing connection")
135136
try:
136-
for publisher in self._publishers:
137+
for publisher in self._publishers[:]:
137138
publisher.close()
138-
for consumer in self._consumers:
139+
for consumer in self._consumers[:]:
139140
consumer.close()
140141
self._conn.close()
141142
except Exception as e:
@@ -165,8 +166,9 @@ def publisher(self, destination: str = "") -> Publisher:
165166
"destination address must start with /queues or /exchanges"
166167
)
167168
publisher = Publisher(self._conn, destination)
169+
publisher._set_publishers_list(self._publishers)
168170
self._publishers.append(publisher)
169-
return self._publishers[self._publishers.index(publisher)]
171+
return publisher
170172

171173
def consumer(
172174
self,
@@ -202,26 +204,41 @@ def consumer(
202204

203205
def _on_disconnection(self) -> None:
204206

205-
print("disconnected")
206-
207207
if self in self._connections:
208208
self._connections.remove(self)
209209

210-
print("reconnecting")
211210
self._conn = BlockingConnection(
212211
url=self._addr,
213212
urls=self._addrs,
214213
ssl_domain=self._ssl_domain,
215214
on_disconnection_handler=self._on_disconnection,
216215
)
217-
self._open()
216+
218217
self._connections.append(self)
219218

220-
for index, publisher in enumerate(self._publishers):
221-
# publisher = self._publishers.pop(index)
222-
# address = publisher.address
223-
self._publishers.remove(publisher)
224-
# self._publishers.insert(index, Publisher(self._conn, address))
219+
for i, management in enumerate(self._managements):
220+
# Update the broken connection and sender in the management
221+
self._managements[i].update_connection(self._conn)
222+
223+
for i, publisher in enumerate(self._publishers):
224+
# Update the broken connection and sender in the publisher
225+
self._publishers[i].update_connection(self._conn)
225226

226227
for i, consumer in enumerate(self._consumers):
227-
self._consumers.remove(consumer)
228+
# Update the broken connection and sender in the consumer
229+
self._consumers[i].update_connection(self._conn)
230+
231+
@property
232+
def active_producers(self) -> int:
233+
"""Returns the number of active connections"""
234+
return len(self._publishers)
235+
236+
@property
237+
def active_consumers(self) -> int:
238+
"""Returns the number of active connections"""
239+
return len(self._consumers)
240+
241+
@property
242+
def active_managements(self) -> int:
243+
"""Returns the number of active connections"""
244+
return len(self._managements)

rabbitmq_amqp_python_client/consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def _open(self) -> None:
6565
logger.debug("Creating Sender")
6666
self._receiver = self._create_receiver(self._addr)
6767

68+
def update_connection(self, conn: BlockingConnection) -> None:
69+
self._conn = conn
70+
self._receiver = self._create_receiver(self._addr)
71+
6872
def _set_consumers_list(self, consumers: []) -> None: # type: ignore
6973
self._consumers = consumers
7074

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ def __init__(self, conn: BlockingConnection):
5353
self._receiver: Optional[BlockingReceiver] = None
5454
self._conn = conn
5555

56+
def update_connection(self, conn: BlockingConnection) -> None:
57+
self._conn = conn
58+
self._sender = self._create_sender(CommonValues.management_node_address.value)
59+
self._receiver = self._create_receiver(
60+
CommonValues.management_node_address.value,
61+
)
62+
5663
def open(self) -> None:
5764
"""
5865
Open the management connection by creating sender and receiver.

rabbitmq_amqp_python_client/publisher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def __init__(self, conn: BlockingConnection, addr: str = ""):
4646
self._publishers: list[Publisher] = []
4747
self._open()
4848

49+
def update_connection(self, conn: BlockingConnection) -> None:
50+
self._conn = conn
51+
self._sender = self._create_sender(self._addr)
52+
4953
def _open(self) -> None:
5054
if self._sender is None:
5155
logger.debug("Creating Sender")

tests/test_connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,11 @@ def test_connection_reconnection() -> None:
9090
# delay
9191
time.sleep(5)
9292
# simulate a disconnection
93-
delete_all_connections()
9493
# raise a reconnection
9594
management = connection.management()
95+
96+
delete_all_connections()
97+
9698
stream_name = "test_stream_info_with_validation"
9799
queue_specification = StreamSpecification(
98100
name=stream_name,
@@ -104,7 +106,6 @@ def test_connection_reconnection() -> None:
104106
disconnected = True
105107

106108
# check that we reconnected
107-
management = connection.management()
108109
management.declare_queue(queue_specification)
109110
management.delete_queue(stream_name)
110111
management.close()

tests/test_publisher.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,11 @@ def test_disconnection_reconnection() -> None:
270270
# delay
271271
time.sleep(5)
272272
messages_to_publish = 10000
273-
queue_name = "test-queue"
273+
queue_name = "test-queue-reconnection"
274274
management = connection_test.management()
275275

276276
management.declare_queue(QuorumQueueSpecification(name=queue_name))
277277

278-
management.close()
279-
280278
publisher = connection_test.publisher(
281279
destination=AddressHelper.queue_address(queue_name)
282280
)
@@ -291,9 +289,6 @@ def test_disconnection_reconnection() -> None:
291289

292290
except ConnectionClosed:
293291
disconnected = True
294-
publisher = connection_test.publisher(
295-
destination=AddressHelper.queue_address(queue_name)
296-
)
297292
continue
298293

299294
except Exception:
@@ -303,14 +298,6 @@ def test_disconnection_reconnection() -> None:
303298

304299
publisher.close()
305300

306-
# cleanup, we need to create a new connection as the previous one
307-
# was closed by the test
308-
309-
connection_test = environment.connection()
310-
connection_test.dial()
311-
312-
management = connection_test.management()
313-
314301
# purge the queue and check number of published messages
315302
message_purged = management.purge_queue(queue_name)
316303

@@ -335,14 +322,10 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
335322
management = connection.management()
336323
management.declare_queue(queue_specification)
337324

338-
print("before creating publisher")
339-
340325
publisher = connection.publisher(
341326
destination=AddressHelper.queue_address(stream_name)
342327
)
343328

344-
print("after creating publisher")
345-
346329
for i in range(messages_to_send):
347330

348331
publisher.publish(Message(body="test"))
@@ -392,3 +375,55 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
392375
assert accepted_2 is True
393376
assert purged_messages_queue == 2
394377
assert raised is False
378+
379+
380+
def test_multiple_publishers(environment: Environment) -> None:
381+
382+
stream_name = "test_multiple_publisher_1"
383+
stream_name_2 = "test_multiple_publisher_2"
384+
connection = environment.connection()
385+
connection.dial()
386+
387+
stream_specification = StreamSpecification(
388+
name=stream_name,
389+
)
390+
management = connection.management()
391+
management.declare_queue(stream_specification)
392+
393+
stream_specification = StreamSpecification(
394+
name=stream_name_2,
395+
)
396+
management.declare_queue(stream_specification)
397+
398+
destination = AddressHelper.queue_address(stream_name)
399+
destination_2 = AddressHelper.queue_address(stream_name_2)
400+
connection.publisher(destination)
401+
402+
assert connection.active_producers == 1
403+
404+
publisher_2 = connection.publisher(destination_2)
405+
406+
assert connection.active_producers == 2
407+
408+
publisher_2.close()
409+
410+
assert connection.active_producers == 1
411+
412+
connection.publisher(destination_2)
413+
414+
assert connection.active_producers == 2
415+
416+
connection.close()
417+
418+
assert connection.active_producers == 0
419+
420+
# cleanup
421+
connection = environment.connection()
422+
connection.dial()
423+
management = connection.management()
424+
425+
management.delete_queue(stream_name)
426+
427+
management.delete_queue(stream_name_2)
428+
429+
management.close()

0 commit comments

Comments
 (0)