Skip to content

Commit f94c5be

Browse files
author
DanielePalaia
committed
move conneciton parameters to environment
1 parent fc81de2 commit f94c5be

File tree

12 files changed

+120
-123
lines changed

12 files changed

+120
-123
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ A connection to the RabbitMQ AMQP 1.0 server can be established using the Enviro
4343
For example:
4444

4545
```python
46-
environment = Environment()
47-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
46+
environment = Environment("amqp://guest:guest@localhost:5672/")
47+
connection = environment.connection()
4848
connection.dial()
4949
```
5050

examples/getting_started/getting_started.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ def on_message(self, event: Event):
5050
if self._count == MESSAGES_TO_PUBLISH:
5151
print("closing receiver")
5252
# if you want you can add cleanup operations here
53-
# event.receiver.close()
54-
# event.connection.close()
5553

5654
def on_connection_closed(self, event: Event):
5755
# if you want you can add cleanup operations here
@@ -63,7 +61,7 @@ def on_link_closed(self, event: Event) -> None:
6361

6462

6563
def create_connection(environment: Environment) -> Connection:
66-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
64+
connection = environment.connection()
6765
# in case of SSL enablement
6866
# ca_cert_file = ".ci/certs/ca_certificate.pem"
6967
# client_cert = ".ci/certs/client_certificate.pem"
@@ -87,7 +85,7 @@ def main() -> None:
8785
routing_key = "routing-key"
8886

8987
print("connection to amqp server")
90-
environment = Environment()
88+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
9189
connection = create_connection(environment)
9290

9391
management = connection.management()

examples/reconnection/reconnection_example.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
QuorumQueueSpecification,
2222
)
2323

24-
environment = Environment()
25-
2624

2725
# here we keep track of the objects we need to reconnect
2826
@dataclass
@@ -42,6 +40,7 @@ class ConnectionConfiguration:
4240
def on_disconnection():
4341

4442
print("disconnected")
43+
global environment
4544
exchange_name = "test-exchange"
4645
queue_name = "example-queue"
4746
routing_key = "routing-key"
@@ -69,6 +68,11 @@ def on_disconnection():
6968
)
7069

7170

71+
environment = Environment(
72+
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
73+
)
74+
75+
7276
class MyMessageHandler(AMQPMessagingHandler):
7377

7478
def __init__(self):
@@ -101,8 +105,6 @@ def on_message(self, event: Event):
101105
if self._count == MESSAGES_TO_PUBLSH:
102106
print("closing receiver")
103107
# if you want you can add cleanup operations here
104-
# event.receiver.close()
105-
# event.connection.close()
106108

107109
def on_connection_closed(self, event: Event):
108110
# if you want you can add cleanup operations here
@@ -122,10 +124,7 @@ def create_connection() -> Connection:
122124
# ]
123125
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
124126

125-
connection = environment.connection(
126-
uri="amqp://guest:guest@localhost:5672/",
127-
on_disconnection_handler=on_disconnection,
128-
)
127+
connection = environment.connection()
129128
connection.dial()
130129

131130
return connection

examples/streams/example_with_streams.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ def on_message(self, event: Event):
5454
if self._count == MESSAGES_TO_PUBLISH:
5555
print("closing receiver")
5656
# if you want you can add cleanup operations here
57-
# event.receiver.close()
58-
# event.connection.close()
5957

6058
def on_connection_closed(self, event: Event):
6159
# if you want you can add cleanup operations here
@@ -67,7 +65,7 @@ def on_link_closed(self, event: Event) -> None:
6765

6866

6967
def create_connection(environment: Environment) -> Connection:
70-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
68+
connection = environment.connection()
7169
# in case of SSL enablement
7270
# ca_cert_file = ".ci/certs/ca_certificate.pem"
7371
# client_cert = ".ci/certs/client_certificate.pem"
@@ -88,7 +86,7 @@ def main() -> None:
8886
queue_name = "example-queue"
8987

9088
print("connection to amqp server")
91-
environment = Environment()
89+
environment = Environment("amqp://guest:guest@localhost:5672/")
9290
connection = create_connection(environment)
9391

9492
management = connection.management()

examples/tls/tls_example.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ def on_message(self, event: Event):
5151
if self._count == messages_to_publish:
5252
print("closing receiver")
5353
# if you want you can add cleanup operations here
54-
# event.receiver.close()
55-
# event.connection.close()
5654

5755
def on_connection_closed(self, event: Event):
5856
# if you want you can add cleanup operations here
@@ -65,16 +63,7 @@ def on_link_closed(self, event: Event) -> None:
6563

6664
def create_connection(environment: Environment) -> Connection:
6765
# in case of SSL enablement
68-
ca_cert_file = ".ci/certs/ca_certificate.pem"
69-
client_cert = ".ci/certs/client_certificate.pem"
70-
client_key = ".ci/certs/client_key.pem"
71-
connection = environment.connection(
72-
"amqps://guest:guest@localhost:5671/",
73-
ssl_context=SslConfigurationContext(
74-
ca_cert=ca_cert_file,
75-
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76-
),
77-
)
66+
connection = environment.connection()
7867
connection.dial()
7968

8069
return connection
@@ -85,7 +74,17 @@ def main() -> None:
8574
exchange_name = "test-exchange"
8675
queue_name = "example-queue"
8776
routing_key = "routing-key"
88-
environment = Environment()
77+
ca_cert_file = ".ci/certs/ca_certificate.pem"
78+
client_cert = ".ci/certs/client_certificate.pem"
79+
client_key = ".ci/certs/client_key.pem"
80+
81+
environment = Environment(
82+
"amqps://guest:guest@localhost:5671/",
83+
ssl_context=SslConfigurationContext(
84+
ca_cert=ca_cert_file,
85+
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
86+
),
87+
)
8988

9089
print("connection to amqp server")
9190
connection = create_connection(environment)

rabbitmq_amqp_python_client/connection.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ def __init__(
4949
Raises:
5050
ValueError: If neither uri nor uris is provided
5151
"""
52+
if uri is not None and uris is not None:
53+
raise ValueError(
54+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
55+
)
5256
if uri is None and uris is None:
53-
raise ValueError("You need to specify at least an addr or a list of addr")
57+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
5458
self._addr: Optional[str] = uri
5559
self._addrs: Optional[list[str]] = uris
5660
self._conn: BlockingConnection
@@ -117,8 +121,15 @@ def close(self) -> None:
117121
Closes the underlying connection and removes it from the connection list.
118122
"""
119123
logger.debug("Closing connection")
120-
self._conn.close()
121-
self._connections.remove(self)
124+
try:
125+
self._conn.close()
126+
except Exception as e:
127+
logger.error(f"Error closing connection: {e}")
128+
raise e
129+
130+
finally:
131+
if self in self._connections:
132+
self._connections.remove(self)
122133

123134
def publisher(self, destination: str = "") -> Publisher:
124135
"""

rabbitmq_amqp_python_client/environment.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# For the moment this is just a Connection pooler to keep compatibility with other clients
2+
import copy
23
import logging
34
from typing import Annotated, Callable, Optional, TypeVar
45

@@ -23,46 +24,58 @@ class Environment:
2324
_connections (list[Connection]): List of active connections managed by this environment
2425
"""
2526

26-
def __init__(self): # type: ignore
27+
def __init__(
28+
self, # single-node mode
29+
uri: Optional[str] = None,
30+
# multi-node mode
31+
uris: Optional[list[str]] = None,
32+
ssl_context: Optional[SslConfigurationContext] = None,
33+
on_disconnection_handler: Optional[CB] = None, # type: ignore
34+
):
2735
"""
2836
Initialize a new Environment instance.
2937
3038
Creates an empty list to track active connections.
39+
40+
Args:
41+
uri: Single node connection URI
42+
uris: List of URIs for multi-node setup
43+
ssl_context: SSL configuration for secure connections
44+
on_disconnection_handler: Callback for handling disconnection events
45+
3146
"""
47+
if uri is not None and uris is not None:
48+
raise ValueError(
49+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
50+
)
51+
if uri is None and uris is None:
52+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
53+
self._uri = uri
54+
self._uris = uris
55+
self._ssl_context = ssl_context
56+
self._on_disconnection_handler = on_disconnection_handler
3257
self._connections: list[Connection] = []
3358

3459
def connection(
3560
self,
36-
# single-node mode
37-
uri: Optional[str] = None,
38-
# multi-node mode
39-
uris: Optional[list[str]] = None,
40-
ssl_context: Optional[SslConfigurationContext] = None,
41-
on_disconnection_handler: Optional[CB] = None, # type: ignore
4261
) -> Connection:
4362
"""
4463
Create and return a new connection.
4564
4665
This method supports both single-node and multi-node configurations, with optional
4766
SSL/TLS security and disconnection handling.
4867
49-
Args:
50-
uri: Single node connection URI
51-
uris: List of URIs for multi-node setup
52-
ssl_context: SSL configuration for secure connections
53-
on_disconnection_handler: Callback for handling disconnection events
54-
5568
Returns:
5669
Connection: A new connection instance
5770
5871
Raises:
5972
ValueError: If neither uri nor uris is provided
6073
"""
6174
connection = Connection(
62-
uri=uri,
63-
uris=uris,
64-
ssl_context=ssl_context,
65-
on_disconnection_handler=on_disconnection_handler,
75+
uri=self._uri,
76+
uris=self._uris,
77+
ssl_context=self._ssl_context,
78+
on_disconnection_handler=self._on_disconnection_handler,
6679
)
6780
logger.debug("Environment: Creating and returning a new connection")
6881
self._connections.append(connection)

0 commit comments

Comments
 (0)