Skip to content

Commit 314af4c

Browse files
author
DanielePalaia
committed
reconnecting managements, publishers and consumers
1 parent bca0fef commit 314af4c

File tree

7 files changed

+38
-40
lines changed

7 files changed

+38
-40
lines changed

examples/reconnection/reconnection_example.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def main() -> None:
129129
publisher.publish(Message(body="test"))
130130
except ConnectionClosed:
131131
print("publisher closing exception, resubmitting")
132-
publisher = connection.publisher(addr)
132+
# publisher = connection.publisher(addr)
133133
continue
134134

135135
print("closing publisher")
@@ -153,9 +153,6 @@ def main() -> None:
153153
except KeyboardInterrupt:
154154
pass
155155
except ConnectionClosed:
156-
consumer = connection.consumer(
157-
addr_queue, message_handler=MyMessageHandler()
158-
)
159156
continue
160157
except Exception as e:
161158
print("consumer exited for exception " + str(e))
@@ -165,7 +162,6 @@ def main() -> None:
165162
print("cleanup")
166163
consumer.close()
167164

168-
management = connection.management()
169165
print("unbind")
170166
management.unbind(bind_name)
171167

rabbitmq_amqp_python_client/connection.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
self._addr: Optional[str] = uri
5959
self._addrs: Optional[list[str]] = uris
6060
self._conn: BlockingConnection
61-
self._management: Management
61+
self._managements: list[Management] = []
6262
self._reconnect = reconnect
6363
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
6464
self._ssl_domain = None
@@ -108,21 +108,20 @@ def dial(self) -> None:
108108
on_disconnection_handler=self._on_disconnection,
109109
)
110110

111-
self._open()
111+
# self._open()
112112
logger.debug("Connection to the server established")
113113

114-
def _open(self) -> None:
115-
self._management = Management(self._conn)
116-
self._management.open()
117-
118114
def management(self) -> Management:
119115
"""
120116
Get the management interface for this connection.
121117
122118
Returns:
123119
Management: The management interface for performing administrative tasks
124120
"""
125-
return self._management
121+
management = Management(self._conn)
122+
management.open()
123+
self._managements.append(management)
124+
return management
126125

127126
# closes the connection to the AMQP 1.0 server.
128127
def close(self) -> None:
@@ -137,6 +136,8 @@ def close(self) -> None:
137136
publisher.close()
138137
for consumer in self._consumers:
139138
consumer.close()
139+
for management in self._managements:
140+
management.close()
140141
self._conn.close()
141142
except Exception as e:
142143
logger.error(f"Error closing connection: {e}")
@@ -166,7 +167,7 @@ def publisher(self, destination: str = "") -> Publisher:
166167
)
167168
publisher = Publisher(self._conn, destination)
168169
self._publishers.append(publisher)
169-
return self._publishers[self._publishers.index(publisher)]
170+
return publisher
170171

171172
def consumer(
172173
self,
@@ -202,26 +203,26 @@ def consumer(
202203

203204
def _on_disconnection(self) -> None:
204205

205-
print("disconnected")
206-
207206
if self in self._connections:
208207
self._connections.remove(self)
209208

210-
print("reconnecting")
211209
self._conn = BlockingConnection(
212210
url=self._addr,
213211
urls=self._addrs,
214212
ssl_domain=self._ssl_domain,
215213
on_disconnection_handler=self._on_disconnection,
216214
)
217-
self._open()
215+
218216
self._connections.append(self)
219217

220-
for index, publisher in enumerate(self._publishers):
221-
# publisher = self._publishers.pop(index)
222-
# address = publisher.address
223-
self._publishers.remove(publisher)
224-
# self._publishers.insert(index, Publisher(self._conn, address))
218+
for i, management in enumerate(self._managements):
219+
# Update the broken connection and sender in the management
220+
self._managements[i].update_connection(self._conn)
221+
222+
for i, publisher in enumerate(self._publishers):
223+
# Update the broken connection and sender in the publisher
224+
self._publishers[i].update_connection(self._conn)
225225

226226
for i, consumer in enumerate(self._consumers):
227-
self._consumers.remove(consumer)
227+
# Update the broken connection and sender in the consumer
228+
self._consumers[i].update_connection(self._conn)

rabbitmq_amqp_python_client/consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def _open(self) -> None:
6565
logger.debug("Creating Sender")
6666
self._receiver = self._create_receiver(self._addr)
6767

68+
def update_connection(self, conn: BlockingConnection) -> None:
69+
self._conn = conn
70+
self._receiver = self._create_receiver(self._addr)
71+
6872
def _set_consumers_list(self, consumers: []) -> None: # type: ignore
6973
self._consumers = consumers
7074

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ def __init__(self, conn: BlockingConnection):
5353
self._receiver: Optional[BlockingReceiver] = None
5454
self._conn = conn
5555

56+
def update_connection(self, conn: BlockingConnection) -> None:
57+
self._conn = conn
58+
self._sender = self._create_sender(CommonValues.management_node_address.value)
59+
self._receiver = self._create_receiver(
60+
CommonValues.management_node_address.value,
61+
)
62+
5663
def open(self) -> None:
5764
"""
5865
Open the management connection by creating sender and receiver.

rabbitmq_amqp_python_client/publisher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def __init__(self, conn: BlockingConnection, addr: str = ""):
4646
self._publishers: list[Publisher] = []
4747
self._open()
4848

49+
def update_connection(self, conn: BlockingConnection) -> None:
50+
self._conn = conn
51+
self._sender = self._create_sender(self._addr)
52+
4953
def _open(self) -> None:
5054
if self._sender is None:
5155
logger.debug("Creating Sender")

tests/test_connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,11 @@ def test_connection_reconnection() -> None:
9090
# delay
9191
time.sleep(5)
9292
# simulate a disconnection
93-
delete_all_connections()
9493
# raise a reconnection
9594
management = connection.management()
95+
96+
delete_all_connections()
97+
9698
stream_name = "test_stream_info_with_validation"
9799
queue_specification = StreamSpecification(
98100
name=stream_name,
@@ -104,7 +106,6 @@ def test_connection_reconnection() -> None:
104106
disconnected = True
105107

106108
# check that we reconnected
107-
management = connection.management()
108109
management.declare_queue(queue_specification)
109110
management.delete_queue(stream_name)
110111
management.close()

tests/test_publisher.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,6 @@ def test_disconnection_reconnection() -> None:
291291

292292
except ConnectionClosed:
293293
disconnected = True
294-
publisher = connection_test.publisher(
295-
destination=AddressHelper.queue_address(queue_name)
296-
)
297294
continue
298295

299296
except Exception:
@@ -303,14 +300,6 @@ def test_disconnection_reconnection() -> None:
303300

304301
publisher.close()
305302

306-
# cleanup, we need to create a new connection as the previous one
307-
# was closed by the test
308-
309-
connection_test = environment.connection()
310-
connection_test.dial()
311-
312-
management = connection_test.management()
313-
314303
# purge the queue and check number of published messages
315304
message_purged = management.purge_queue(queue_name)
316305

@@ -335,14 +324,10 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
335324
management = connection.management()
336325
management.declare_queue(queue_specification)
337326

338-
print("before creating publisher")
339-
340327
publisher = connection.publisher(
341328
destination=AddressHelper.queue_address(stream_name)
342329
)
343330

344-
print("after creating publisher")
345-
346331
for i in range(messages_to_send):
347332

348333
publisher.publish(Message(body="test"))

0 commit comments

Comments
 (0)