Skip to content

move connection parameters to environment #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ A connection to the RabbitMQ AMQP 1.0 server can be established using the Enviro
For example:

```python
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = environment.connection()
connection.dial()
```

Expand Down
6 changes: 2 additions & 4 deletions examples/getting_started/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def on_message(self, event: Event):
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
Expand All @@ -63,7 +61,7 @@ def on_link_closed(self, event: Event) -> None:


def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
connection = environment.connection()
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
Expand All @@ -87,7 +85,7 @@ def main() -> None:
routing_key = "routing-key"

print("connection to amqp server")
environment = Environment()
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)

management = connection.management()
Expand Down
15 changes: 7 additions & 8 deletions examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
QuorumQueueSpecification,
)

environment = Environment()


# here we keep track of the objects we need to reconnect
@dataclass
Expand All @@ -42,6 +40,7 @@ class ConnectionConfiguration:
def on_disconnection():

print("disconnected")
global environment
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
Expand Down Expand Up @@ -69,6 +68,11 @@ def on_disconnection():
)


environment = Environment(
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
)


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
Expand Down Expand Up @@ -101,8 +105,6 @@ def on_message(self, event: Event):
if self._count == MESSAGES_TO_PUBLSH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
Expand All @@ -122,10 +124,7 @@ def create_connection() -> Connection:
# ]
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)

connection = environment.connection(
uri="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection = environment.connection()
connection.dial()

return connection
Expand Down
6 changes: 2 additions & 4 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ def on_message(self, event: Event):
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
Expand All @@ -67,7 +65,7 @@ def on_link_closed(self, event: Event) -> None:


def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
connection = environment.connection()
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
Expand All @@ -88,7 +86,7 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
environment = Environment()
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)

management = connection.management()
Expand Down
25 changes: 12 additions & 13 deletions examples/tls/tls_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ def on_message(self, event: Event):
if self._count == messages_to_publish:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
Expand All @@ -65,16 +63,7 @@ def on_link_closed(self, event: Event) -> None:

def create_connection(environment: Environment) -> Connection:
# in case of SSL enablement
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = environment.connection(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
),
)
connection = environment.connection()
connection.dial()

return connection
Expand All @@ -85,7 +74,17 @@ def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
environment = Environment()
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"

environment = Environment(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
),
)

print("connection to amqp server")
connection = create_connection(environment)
Expand Down
17 changes: 14 additions & 3 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ def __init__(
Raises:
ValueError: If neither uri nor uris is provided
"""
if uri is not None and uris is not None:
raise ValueError(
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
)
if uri is None and uris is None:
raise ValueError("You need to specify at least an addr or a list of addr")
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
self._addr: Optional[str] = uri
self._addrs: Optional[list[str]] = uris
self._conn: BlockingConnection
Expand Down Expand Up @@ -117,8 +121,15 @@ def close(self) -> None:
Closes the underlying connection and removes it from the connection list.
"""
logger.debug("Closing connection")
self._conn.close()
self._connections.remove(self)
try:
self._conn.close()
except Exception as e:
logger.error(f"Error closing connection: {e}")
raise e

finally:
if self in self._connections:
self._connections.remove(self)

def publisher(self, destination: str = "") -> Publisher:
"""
Expand Down
46 changes: 29 additions & 17 deletions rabbitmq_amqp_python_client/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,58 @@ class Environment:
_connections (list[Connection]): List of active connections managed by this environment
"""

def __init__(self): # type: ignore
def __init__(
self, # single-node mode
uri: Optional[str] = None,
# multi-node mode
uris: Optional[list[str]] = None,
ssl_context: Optional[SslConfigurationContext] = None,
on_disconnection_handler: Optional[CB] = None, # type: ignore
):
"""
Initialize a new Environment instance.

Creates an empty list to track active connections.

Args:
uri: Single node connection URI
uris: List of URIs for multi-node setup
ssl_context: SSL configuration for secure connections
on_disconnection_handler: Callback for handling disconnection events

"""
if uri is not None and uris is not None:
raise ValueError(
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
)
if uri is None and uris is None:
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
self._uri = uri
self._uris = uris
self._ssl_context = ssl_context
self._on_disconnection_handler = on_disconnection_handler
self._connections: list[Connection] = []

def connection(
self,
# single-node mode
uri: Optional[str] = None,
# multi-node mode
uris: Optional[list[str]] = None,
ssl_context: Optional[SslConfigurationContext] = None,
on_disconnection_handler: Optional[CB] = None, # type: ignore
) -> Connection:
"""
Create and return a new connection.

This method supports both single-node and multi-node configurations, with optional
SSL/TLS security and disconnection handling.

Args:
uri: Single node connection URI
uris: List of URIs for multi-node setup
ssl_context: SSL configuration for secure connections
on_disconnection_handler: Callback for handling disconnection events

Returns:
Connection: A new connection instance

Raises:
ValueError: If neither uri nor uris is provided
"""
connection = Connection(
uri=uri,
uris=uris,
ssl_context=ssl_context,
on_disconnection_handler=on_disconnection_handler,
uri=self._uri,
uris=self._uris,
ssl_context=self._ssl_context,
on_disconnection_handler=self._on_disconnection_handler,
)
logger.debug("Environment: Creating and returning a new connection")
self._connections.append(connection)
Expand Down
29 changes: 11 additions & 18 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@pytest.fixture()
def environment(pytestconfig):
environment = Environment()
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
try:
yield environment

Expand All @@ -25,8 +25,8 @@ def environment(pytestconfig):

@pytest.fixture()
def connection(pytestconfig):
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection = environment.connection()
connection.dial()
try:
yield connection
Expand All @@ -37,17 +37,18 @@ def connection(pytestconfig):

@pytest.fixture()
def connection_ssl(pytestconfig):
environment = Environment()
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = environment.connection(

environment = Environment(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
),
)
connection = environment.connection()
connection.dial()
try:
yield connection
Expand All @@ -58,8 +59,8 @@ def connection_ssl(pytestconfig):

@pytest.fixture()
def management(pytestconfig):
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection = environment.connection()
connection.dial()
try:
management = connection.management()
Expand All @@ -71,8 +72,8 @@ def management(pytestconfig):

@pytest.fixture()
def consumer(pytestconfig):
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection = environment.connection()
connection.dial()
try:
queue_name = "test-queue"
Expand Down Expand Up @@ -105,7 +106,6 @@ def on_message(self, event: Event):
self.delivery_context.accept(event)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -123,7 +123,6 @@ def on_message(self, event: Event):
self.delivery_context.accept(event)
self._received = self._received + 1
if self._received == 10:
event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -136,8 +135,6 @@ def __init__(self):
def on_message(self, event: Event):
self._received = self._received + 1
if self._received == 1000:
event.receiver.close()
event.connection.close()
# Workaround to terminate the Consumer and notify the test when all messages are consumed
raise ConsumerTestException("consumed")

Expand All @@ -152,7 +149,6 @@ def on_message(self, event: Event):
self.delivery_context.discard(event)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -168,7 +164,6 @@ def on_message(self, event: Event):
self.delivery_context.discard_with_annotations(event, annotations)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -182,7 +177,7 @@ def on_message(self, event: Event):
self.delivery_context.requeue(event)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
# event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -198,7 +193,6 @@ def on_message(self, event: Event):
self.delivery_context.requeue_with_annotations(event, annotations)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
raise ConsumerTestException("consumed")


Expand All @@ -214,5 +208,4 @@ def on_message(self, event: Event):
self.delivery_context.requeue_with_annotations(event, annotations)
self._received = self._received + 1
if self._received == 1000:
event.connection.close()
raise ConsumerTestException("consumed")
Loading