Skip to content

Commit 1165a4d

Browse files
DanielePalaiaDanielePalaia
and
DanielePalaia
authored
Improve reconnection (#52)
* improving disconnection management * reconnecting managements, publishers and consumers * supporting reconnection of streams by last consumed offset * updating readme * removing print * implement BackOff and MaxReconnectAttempts * few improvements * removing prints * address PR comments --------- Co-authored-by: DanielePalaia <daniele985@@gmail.com>
1 parent d4ed42d commit 1165a4d

17 files changed

+451
-189
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e
147147

148148
### Managing disconnections
149149

150-
At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
151-
You can use this callback to implement your own logic and eventually attempt a reconnection.
150+
The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers
152151

153152
You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and
154153
eventually attempt a reconnection

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self):
2323
super().__init__()
2424
self._count = 0
2525

26-
def on_message(self, event: Event):
26+
def on_amqp_message(self, event: Event):
2727
print("received message: " + str(event.message.body))
2828

2929
# accepting
@@ -147,7 +147,7 @@ def main() -> None:
147147
consumer.close()
148148
# once we finish consuming if we close the connection we need to create a new one
149149
# connection = create_connection()
150-
# management = connection.management()
150+
management = connection.management()
151151

152152
print("unbind")
153153
management.unbind(bind_name)

examples/reconnection/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
Automatic reconnection
2+
===
3+
4+
You can use this example to test automatic reconnection.
5+
6+
The scenario is publishing and consuming a lot of messages in a queue.
7+
8+
From the RabbitMQ UI you can break a connection to see the automatic reconnection happening.
9+
10+
Same for Consumers.
11+
12+
In case of streams the connection will restart consuming from the last consumed offset.
13+
14+
You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify
15+
the backoff interval and the maximum_retries before the client gives up.
16+
17+
To disable automatic reconnection you can set active_recovery of RecoveryConfiguration to False
18+

examples/reconnection/reconnection_example.py

Lines changed: 27 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,23 @@
11
# type: ignore
2-
3-
4-
import time
5-
from dataclasses import dataclass
6-
from typing import Optional
7-
82
from rabbitmq_amqp_python_client import (
93
AddressHelper,
104
AMQPMessagingHandler,
115
Connection,
126
ConnectionClosed,
13-
Consumer,
147
Environment,
158
Event,
169
ExchangeSpecification,
1710
ExchangeToQueueBindingSpecification,
18-
Management,
1911
Message,
20-
Publisher,
2112
QuorumQueueSpecification,
2213
)
2314

24-
2515
# here we keep track of the objects we need to reconnect
26-
@dataclass
27-
class ConnectionConfiguration:
28-
connection: Optional[Connection] = None
29-
management: Optional[Management] = None
30-
publisher: Optional[Publisher] = None
31-
consumer: Optional[Consumer] = None
32-
33-
34-
connection_configuration = ConnectionConfiguration()
35-
MESSAGES_TO_PUBLSH = 50000
36-
37-
38-
# disconnection callback
39-
# here you can cleanup or reconnect
40-
def on_disconnection():
41-
42-
print("disconnected")
43-
global environment
44-
exchange_name = "test-exchange"
45-
queue_name = "example-queue"
46-
routing_key = "routing-key"
47-
48-
global connection_configuration
49-
50-
addr = AddressHelper.exchange_address(exchange_name, routing_key)
51-
addr_queue = AddressHelper.queue_address(queue_name)
52-
53-
if connection_configuration.connection is not None:
54-
connection_configuration.connection = create_connection()
55-
if connection_configuration.management is not None:
56-
connection_configuration.management = (
57-
connection_configuration.connection.management()
58-
)
59-
if connection_configuration.publisher is not None:
60-
connection_configuration.publisher = (
61-
connection_configuration.connection.publisher(addr)
62-
)
63-
if connection_configuration.consumer is not None:
64-
connection_configuration.consumer = (
65-
connection_configuration.connection.consumer(
66-
addr_queue, message_handler=MyMessageHandler()
67-
)
68-
)
16+
MESSAGES_TO_PUBLISH = 50000
6917

7018

7119
environment = Environment(
72-
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
20+
uri="amqp://guest:guest@localhost:5672/",
7321
)
7422

7523

@@ -102,7 +50,7 @@ def on_message(self, event: Event):
10250

10351
self._count = self._count + 1
10452

105-
if self._count == MESSAGES_TO_PUBLSH:
53+
if self._count == MESSAGES_TO_PUBLISH:
10654
print("closing receiver")
10755
# if you want you can add cleanup operations here
10856

@@ -136,29 +84,22 @@ def main() -> None:
13684
queue_name = "example-queue"
13785
routing_key = "routing-key"
13886

139-
global connection_configuration
140-
14187
print("connection to amqp server")
142-
if connection_configuration.connection is None:
143-
connection_configuration.connection = create_connection()
144-
145-
if connection_configuration.management is None:
146-
connection_configuration.management = (
147-
connection_configuration.connection.management()
148-
)
88+
connection = create_connection()
89+
management = connection.management()
90+
publisher = None
91+
consumer = None
14992

15093
print("declaring exchange and queue")
151-
connection_configuration.management.declare_exchange(
152-
ExchangeSpecification(name=exchange_name)
153-
)
94+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
15495

155-
connection_configuration.management.declare_queue(
96+
management.declare_queue(
15697
QuorumQueueSpecification(name=queue_name)
15798
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
15899
)
159100

160101
print("binding queue to exchange")
161-
bind_name = connection_configuration.management.bind(
102+
bind_name = management.bind(
162103
ExchangeToQueueBindingSpecification(
163104
source_exchange=exchange_name,
164105
destination_queue=queue_name,
@@ -171,34 +112,32 @@ def main() -> None:
171112
addr_queue = AddressHelper.queue_address(queue_name)
172113

173114
print("create a publisher and publish a test message")
174-
if connection_configuration.publisher is None:
175-
connection_configuration.publisher = (
176-
connection_configuration.connection.publisher(addr)
177-
)
115+
if publisher is None:
116+
publisher = connection.publisher(addr)
178117

179118
print("purging the queue")
180-
messages_purged = connection_configuration.management.purge_queue(queue_name)
119+
messages_purged = management.purge_queue(queue_name)
181120

182121
print("messages purged: " + str(messages_purged))
183-
# management.close()
184122

185123
# publishing messages
186124
while True:
187-
for i in range(MESSAGES_TO_PUBLSH):
125+
for i in range(MESSAGES_TO_PUBLISH):
188126

189127
if i % 1000 == 0:
190128
print("published 1000 messages...")
191129
try:
192-
if connection_configuration.publisher is not None:
193-
connection_configuration.publisher.publish(Message(body="test"))
130+
if publisher is not None:
131+
publisher.publish(Message(body="test"))
194132
except ConnectionClosed:
195133
print("publisher closing exception, resubmitting")
134+
# publisher = connection.publisher(addr)
196135
continue
197136

198137
print("closing publisher")
199138
try:
200-
if connection_configuration.publisher is not None:
201-
connection_configuration.publisher.close()
139+
if publisher is not None:
140+
publisher.close()
202141
except ConnectionClosed:
203142
print("publisher closing exception, resubmitting")
204143
continue
@@ -207,43 +146,35 @@ def main() -> None:
207146
print(
208147
"create a consumer and consume the test message - press control + c to terminate to consume"
209148
)
210-
if connection_configuration.consumer is None:
211-
connection_configuration.consumer = (
212-
connection_configuration.connection.consumer(
213-
addr_queue, message_handler=MyMessageHandler()
214-
)
215-
)
149+
if consumer is None:
150+
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
216151

217152
while True:
218153
try:
219-
connection_configuration.consumer.run()
154+
consumer.run()
220155
except KeyboardInterrupt:
221156
pass
222157
except ConnectionClosed:
223-
time.sleep(1)
224158
continue
225159
except Exception as e:
226160
print("consumer exited for exception " + str(e))
227161

228162
break
229163

230164
print("cleanup")
231-
connection_configuration.consumer.close()
232-
# once we finish consuming if we close the connection we need to create a new one
233-
# connection = create_connection()
234-
# management = connection.management()
165+
consumer.close()
235166

236167
print("unbind")
237-
connection_configuration.management.unbind(bind_name)
168+
management.unbind(bind_name)
238169

239170
print("delete queue")
240-
connection_configuration.management.delete_queue(queue_name)
171+
management.delete_queue(queue_name)
241172

242173
print("delete exchange")
243-
connection_configuration.management.delete_exchange(exchange_name)
174+
management.delete_exchange(exchange_name)
244175

245176
print("closing connections")
246-
connection_configuration.management.close()
177+
management.close()
247178
print("after management closing")
248179
environment.close()
249180
print("after connection closing")

examples/streams/example_with_streams.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
AddressHelper,
55
AMQPMessagingHandler,
66
Connection,
7+
ConnectionClosed,
78
Environment,
89
Event,
910
Message,
@@ -12,7 +13,7 @@
1213
StreamSpecification,
1314
)
1415

15-
MESSAGES_TO_PUBLISH = 100
16+
MESSAGES_TO_PUBLISH = 1
1617

1718

1819
class MyMessageHandler(AMQPMessagingHandler):
@@ -21,7 +22,7 @@ def __init__(self):
2122
super().__init__()
2223
self._count = 0
2324

24-
def on_message(self, event: Event):
25+
def on_amqp_message(self, event: Event):
2526
# just messages with banana filters get received
2627
print(
2728
"received message from stream: "
@@ -86,7 +87,7 @@ def main() -> None:
8687
queue_name = "example-queue"
8788

8889
print("connection to amqp server")
89-
environment = Environment("amqp://guest:guest@localhost:5672/")
90+
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
9091
connection = create_connection(environment)
9192

9293
management = connection.management()
@@ -134,14 +135,22 @@ def main() -> None:
134135

135136
publisher.close()
136137

137-
try:
138-
consumer.run()
139-
except KeyboardInterrupt:
140-
pass
138+
while True:
139+
try:
140+
consumer.run()
141+
except KeyboardInterrupt:
142+
pass
143+
except ConnectionClosed:
144+
print("connection closed")
145+
continue
146+
except Exception as e:
147+
print("consumer exited for exception " + str(e))
148+
149+
break
141150

142151
#
143152
print("delete queue")
144-
management.delete_queue(queue_name)
153+
# management.delete_queue(queue_name)
145154

146155
print("closing connections")
147156
management.close()

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ExchangeToExchangeBindingSpecification,
1212
ExchangeToQueueBindingSpecification,
1313
OffsetSpecification,
14+
RecoveryConfiguration,
1415
StreamOptions,
1516
)
1617
from .environment import Environment
@@ -87,4 +88,5 @@
8788
"OutcomeState",
8889
"Environment",
8990
"ExchangeCustomSpecification",
91+
"RecoveryConfiguration",
9092
]

rabbitmq_amqp_python_client/amqp_consumer_handler.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .delivery_context import DeliveryContext
2+
from .qpid.proton._events import Event
23
from .qpid.proton.handlers import MessagingHandler
34

45
"""
@@ -20,3 +21,16 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
2021
"""
2122
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
2223
self.delivery_context: DeliveryContext = DeliveryContext()
24+
self._offset = 0
25+
26+
def on_amqp_message(self, event: Event) -> None:
27+
pass
28+
29+
def on_message(self, event: Event) -> None:
30+
if "x-stream-offset" in event.message.annotations:
31+
self._offset = int(event.message.annotations["x-stream-offset"])
32+
self.on_amqp_message(event)
33+
34+
@property
35+
def offset(self) -> int:
36+
return self._offset

0 commit comments

Comments
 (0)