Skip to content

Improve reconnection #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e

### Managing disconnections

At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
You can use this callback to implement your own logic and eventually attempt a reconnection.
The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers

You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and
eventually attempt a reconnection
Expand Down
4 changes: 2 additions & 2 deletions examples/getting_started/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
def on_amqp_message(self, event: Event):
print("received message: " + str(event.message.body))

# accepting
Expand Down Expand Up @@ -147,7 +147,7 @@ def main() -> None:
consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()
management = connection.management()

print("unbind")
management.unbind(bind_name)
Expand Down
18 changes: 18 additions & 0 deletions examples/reconnection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Automatic reconnection
===

You can use this example to test automatic reconnection.

The scenario is publishing and consuming a lot of messages in a queue.

From the RabbitMQ UI you can break a connection to see the automatic reconnection happening.

Same for Consumers.

In case of streams the connection will restart consuming from the last consumed offset.

You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify
the backoff interval and the maximum_retries before the client gives up.

To disable automatic reconnection you can set active_recovery of RecoveryConfiguration to False

123 changes: 27 additions & 96 deletions examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,23 @@
# type: ignore


import time
from dataclasses import dataclass
from typing import Optional

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Management,
Message,
Publisher,
QuorumQueueSpecification,
)


# here we keep track of the objects we need to reconnect
@dataclass
class ConnectionConfiguration:
connection: Optional[Connection] = None
management: Optional[Management] = None
publisher: Optional[Publisher] = None
consumer: Optional[Consumer] = None


connection_configuration = ConnectionConfiguration()
MESSAGES_TO_PUBLSH = 50000


# disconnection callback
# here you can cleanup or reconnect
def on_disconnection():

print("disconnected")
global environment
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"

global connection_configuration

addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)

if connection_configuration.connection is not None:
connection_configuration.connection = create_connection()
if connection_configuration.management is not None:
connection_configuration.management = (
connection_configuration.connection.management()
)
if connection_configuration.publisher is not None:
connection_configuration.publisher = (
connection_configuration.connection.publisher(addr)
)
if connection_configuration.consumer is not None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, message_handler=MyMessageHandler()
)
)
MESSAGES_TO_PUBLISH = 50000


environment = Environment(
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
uri="amqp://guest:guest@localhost:5672/",
)


Expand Down Expand Up @@ -102,7 +50,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == MESSAGES_TO_PUBLSH:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here

Expand Down Expand Up @@ -136,29 +84,22 @@ def main() -> None:
queue_name = "example-queue"
routing_key = "routing-key"

global connection_configuration

print("connection to amqp server")
if connection_configuration.connection is None:
connection_configuration.connection = create_connection()

if connection_configuration.management is None:
connection_configuration.management = (
connection_configuration.connection.management()
)
connection = create_connection()
management = connection.management()
publisher = None
consumer = None

print("declaring exchange and queue")
connection_configuration.management.declare_exchange(
ExchangeSpecification(name=exchange_name)
)
management.declare_exchange(ExchangeSpecification(name=exchange_name))

connection_configuration.management.declare_queue(
management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)

print("binding queue to exchange")
bind_name = connection_configuration.management.bind(
bind_name = management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
Expand All @@ -171,34 +112,32 @@ def main() -> None:
addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
if connection_configuration.publisher is None:
connection_configuration.publisher = (
connection_configuration.connection.publisher(addr)
)
if publisher is None:
publisher = connection.publisher(addr)

print("purging the queue")
messages_purged = connection_configuration.management.purge_queue(queue_name)
messages_purged = management.purge_queue(queue_name)

print("messages purged: " + str(messages_purged))
# management.close()

# publishing messages
while True:
for i in range(MESSAGES_TO_PUBLSH):
for i in range(MESSAGES_TO_PUBLISH):

if i % 1000 == 0:
print("published 1000 messages...")
try:
if connection_configuration.publisher is not None:
connection_configuration.publisher.publish(Message(body="test"))
if publisher is not None:
publisher.publish(Message(body="test"))
except ConnectionClosed:
print("publisher closing exception, resubmitting")
# publisher = connection.publisher(addr)
continue

print("closing publisher")
try:
if connection_configuration.publisher is not None:
connection_configuration.publisher.close()
if publisher is not None:
publisher.close()
except ConnectionClosed:
print("publisher closing exception, resubmitting")
continue
Expand All @@ -207,43 +146,35 @@ def main() -> None:
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
if connection_configuration.consumer is None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, message_handler=MyMessageHandler()
)
)
if consumer is None:
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())

while True:
try:
connection_configuration.consumer.run()
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
time.sleep(1)
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

print("cleanup")
connection_configuration.consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()
consumer.close()

print("unbind")
connection_configuration.management.unbind(bind_name)
management.unbind(bind_name)

print("delete queue")
connection_configuration.management.delete_queue(queue_name)
management.delete_queue(queue_name)

print("delete exchange")
connection_configuration.management.delete_exchange(exchange_name)
management.delete_exchange(exchange_name)

print("closing connections")
connection_configuration.management.close()
management.close()
print("after management closing")
environment.close()
print("after connection closing")
Expand Down
25 changes: 17 additions & 8 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Environment,
Event,
Message,
Expand All @@ -12,7 +13,7 @@
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100
MESSAGES_TO_PUBLISH = 1


class MyMessageHandler(AMQPMessagingHandler):
Expand All @@ -21,7 +22,7 @@ def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
def on_amqp_message(self, event: Event):
# just messages with banana filters get received
print(
"received message from stream: "
Expand Down Expand Up @@ -86,7 +87,7 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
environment = Environment("amqp://guest:guest@localhost:5672/")
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reconnect=True, I would use something like:
https://github.com/rabbitmq/rabbitmq-amqp-go-client/blob/d84c3d22de34b767e55e68e4f0fc54137b506924/pkg/rabbitmqamqp/amqp_connection_recovery.go#L8

RecoveryConfiguration struct {
	/*
		ActiveRecovery Define if the recovery is activated.
		If is not activated the connection will not try to createSender.
	*/
	ActiveRecovery bool

	/*
		BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
		time will be increased exponentially with each attempt.
		Default is 5 seconds, each attempt will double the time.
		The minimum value is 1 second. Avoid setting a value low values since it can cause a high
		number of reconnection attempts.
	*/
	BackOffReconnectInterval time.Duration

	/*
		MaxReconnectAttempts The maximum number of reconnection attempts.
		Default is 5.
		The minimum value is 1.
	*/
	MaxReconnectAttempts int

to make the reconnections more configurable

connection = create_connection(environment)

management = connection.management()
Expand Down Expand Up @@ -134,14 +135,22 @@ def main() -> None:

publisher.close()

try:
consumer.run()
except KeyboardInterrupt:
pass
while True:
try:
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
print("connection closed")
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

#
print("delete queue")
management.delete_queue(queue_name)
# management.delete_queue(queue_name)

print("closing connections")
management.close()
Expand Down
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
OffsetSpecification,
RecoveryConfiguration,
StreamOptions,
)
from .environment import Environment
Expand Down Expand Up @@ -87,4 +88,5 @@
"OutcomeState",
"Environment",
"ExchangeCustomSpecification",
"RecoveryConfiguration",
]
14 changes: 14 additions & 0 deletions rabbitmq_amqp_python_client/amqp_consumer_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .delivery_context import DeliveryContext
from .qpid.proton._events import Event
from .qpid.proton.handlers import MessagingHandler

"""
Expand All @@ -20,3 +21,16 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
"""
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
self.delivery_context: DeliveryContext = DeliveryContext()
self._offset = 0

def on_amqp_message(self, event: Event) -> None:
pass

def on_message(self, event: Event) -> None:
if "x-stream-offset" in event.message.annotations:
self._offset = int(event.message.annotations["x-stream-offset"])
self.on_amqp_message(event)

@property
def offset(self) -> int:
return self._offset
Loading