Skip to content

Commit b712bf4

Browse files
DanielePalaiaDaniele
DanielePalaia
authored and
Daniele
committed
move conneciton parameters to environment
1 parent fc81de2 commit b712bf4

File tree

12 files changed

+121
-114
lines changed

12 files changed

+121
-114
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ A connection to the RabbitMQ AMQP 1.0 server can be established using the Enviro
4343
For example:
4444

4545
```python
46-
environment = Environment()
47-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
46+
environment = Environment("amqp://guest:guest@localhost:5672/")
47+
connection = environment.connection()
4848
connection.dial()
4949
```
5050

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def on_link_closed(self, event: Event) -> None:
6363

6464

6565
def create_connection(environment: Environment) -> Connection:
66-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
66+
connection = environment.connection()
6767
# in case of SSL enablement
6868
# ca_cert_file = ".ci/certs/ca_certificate.pem"
6969
# client_cert = ".ci/certs/client_certificate.pem"
@@ -87,7 +87,7 @@ def main() -> None:
8787
routing_key = "routing-key"
8888

8989
print("connection to amqp server")
90-
environment = Environment()
90+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
9191
connection = create_connection(environment)
9292

9393
management = connection.management()

examples/reconnection/reconnection_example.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
QuorumQueueSpecification,
2222
)
2323

24-
environment = Environment()
25-
2624

2725
# here we keep track of the objects we need to reconnect
2826
@dataclass
@@ -42,6 +40,7 @@ class ConnectionConfiguration:
4240
def on_disconnection():
4341

4442
print("disconnected")
43+
global environment
4544
exchange_name = "test-exchange"
4645
queue_name = "example-queue"
4746
routing_key = "routing-key"
@@ -69,6 +68,11 @@ def on_disconnection():
6968
)
7069

7170

71+
environment = Environment(
72+
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
73+
)
74+
75+
7276
class MyMessageHandler(AMQPMessagingHandler):
7377

7478
def __init__(self):
@@ -122,10 +126,7 @@ def create_connection() -> Connection:
122126
# ]
123127
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
124128

125-
connection = environment.connection(
126-
uri="amqp://guest:guest@localhost:5672/",
127-
on_disconnection_handler=on_disconnection,
128-
)
129+
connection = environment.connection()
129130
connection.dial()
130131

131132
return connection

examples/streams/example_with_streams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def on_link_closed(self, event: Event) -> None:
6767

6868

6969
def create_connection(environment: Environment) -> Connection:
70-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
70+
connection = environment.connection()
7171
# in case of SSL enablement
7272
# ca_cert_file = ".ci/certs/ca_certificate.pem"
7373
# client_cert = ".ci/certs/client_certificate.pem"
@@ -88,7 +88,7 @@ def main() -> None:
8888
queue_name = "example-queue"
8989

9090
print("connection to amqp server")
91-
environment = Environment()
91+
environment = Environment("amqp://guest:guest@localhost:5672/")
9292
connection = create_connection(environment)
9393

9494
management = connection.management()

examples/tls/tls_example.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,7 @@ def on_link_closed(self, event: Event) -> None:
6565

6666
def create_connection(environment: Environment) -> Connection:
6767
# in case of SSL enablement
68-
ca_cert_file = ".ci/certs/ca_certificate.pem"
69-
client_cert = ".ci/certs/client_certificate.pem"
70-
client_key = ".ci/certs/client_key.pem"
71-
connection = environment.connection(
72-
"amqps://guest:guest@localhost:5671/",
73-
ssl_context=SslConfigurationContext(
74-
ca_cert=ca_cert_file,
75-
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76-
),
77-
)
68+
connection = environment.connection()
7869
connection.dial()
7970

8071
return connection
@@ -85,7 +76,17 @@ def main() -> None:
8576
exchange_name = "test-exchange"
8677
queue_name = "example-queue"
8778
routing_key = "routing-key"
88-
environment = Environment()
79+
ca_cert_file = ".ci/certs/ca_certificate.pem"
80+
client_cert = ".ci/certs/client_certificate.pem"
81+
client_key = ".ci/certs/client_key.pem"
82+
83+
environment = Environment(
84+
"amqps://guest:guest@localhost:5671/",
85+
ssl_context=SslConfigurationContext(
86+
ca_cert=ca_cert_file,
87+
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
88+
),
89+
)
8990

9091
print("connection to amqp server")
9192
connection = create_connection(environment)

rabbitmq_amqp_python_client/connection.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ def __init__(
4949
Raises:
5050
ValueError: If neither uri nor uris is provided
5151
"""
52+
if uri is not None and uris is not None:
53+
raise ValueError(
54+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
55+
)
5256
if uri is None and uris is None:
53-
raise ValueError("You need to specify at least an addr or a list of addr")
57+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
5458
self._addr: Optional[str] = uri
5559
self._addrs: Optional[list[str]] = uris
5660
self._conn: BlockingConnection
@@ -118,7 +122,8 @@ def close(self) -> None:
118122
"""
119123
logger.debug("Closing connection")
120124
self._conn.close()
121-
self._connections.remove(self)
125+
if self in self._connections:
126+
self._connections.remove(self)
122127

123128
def publisher(self, destination: str = "") -> Publisher:
124129
"""

rabbitmq_amqp_python_client/environment.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# For the moment this is just a Connection pooler to keep compatibility with other clients
2+
import copy
23
import logging
34
from typing import Annotated, Callable, Optional, TypeVar
45

@@ -23,46 +24,58 @@ class Environment:
2324
_connections (list[Connection]): List of active connections managed by this environment
2425
"""
2526

26-
def __init__(self): # type: ignore
27+
def __init__(
28+
self, # single-node mode
29+
uri: Optional[str] = None,
30+
# multi-node mode
31+
uris: Optional[list[str]] = None,
32+
ssl_context: Optional[SslConfigurationContext] = None,
33+
on_disconnection_handler: Optional[CB] = None, # type: ignore
34+
):
2735
"""
2836
Initialize a new Environment instance.
2937
3038
Creates an empty list to track active connections.
39+
40+
Args:
41+
uri: Single node connection URI
42+
uris: List of URIs for multi-node setup
43+
ssl_context: SSL configuration for secure connections
44+
on_disconnection_handler: Callback for handling disconnection events
45+
3146
"""
47+
if uri is not None and uris is not None:
48+
raise ValueError(
49+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
50+
)
51+
if uri is None and uris is None:
52+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
53+
self._uri = uri
54+
self._uris = uris
55+
self._ssl_context = ssl_context
56+
self._on_disconnection_handler = on_disconnection_handler
3257
self._connections: list[Connection] = []
3358

3459
def connection(
3560
self,
36-
# single-node mode
37-
uri: Optional[str] = None,
38-
# multi-node mode
39-
uris: Optional[list[str]] = None,
40-
ssl_context: Optional[SslConfigurationContext] = None,
41-
on_disconnection_handler: Optional[CB] = None, # type: ignore
4261
) -> Connection:
4362
"""
4463
Create and return a new connection.
4564
4665
This method supports both single-node and multi-node configurations, with optional
4766
SSL/TLS security and disconnection handling.
4867
49-
Args:
50-
uri: Single node connection URI
51-
uris: List of URIs for multi-node setup
52-
ssl_context: SSL configuration for secure connections
53-
on_disconnection_handler: Callback for handling disconnection events
54-
5568
Returns:
5669
Connection: A new connection instance
5770
5871
Raises:
5972
ValueError: If neither uri nor uris is provided
6073
"""
6174
connection = Connection(
62-
uri=uri,
63-
uris=uris,
64-
ssl_context=ssl_context,
65-
on_disconnection_handler=on_disconnection_handler,
75+
uri=self._uri,
76+
uris=self._uris,
77+
ssl_context=copy.deepcopy(self._ssl_context),
78+
on_disconnection_handler=copy.deepcopy(self._on_disconnection_handler),
6679
)
6780
logger.debug("Environment: Creating and returning a new connection")
6881
self._connections.append(connection)

tests/conftest.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
@pytest.fixture()
1717
def environment(pytestconfig):
18-
environment = Environment()
18+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
1919
try:
2020
yield environment
2121

@@ -25,8 +25,8 @@ def environment(pytestconfig):
2525

2626
@pytest.fixture()
2727
def connection(pytestconfig):
28-
environment = Environment()
29-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
28+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
29+
connection = environment.connection()
3030
connection.dial()
3131
try:
3232
yield connection
@@ -37,17 +37,18 @@ def connection(pytestconfig):
3737

3838
@pytest.fixture()
3939
def connection_ssl(pytestconfig):
40-
environment = Environment()
4140
ca_cert_file = ".ci/certs/ca_certificate.pem"
4241
client_cert = ".ci/certs/client_certificate.pem"
4342
client_key = ".ci/certs/client_key.pem"
44-
connection = environment.connection(
43+
44+
environment = Environment(
4545
"amqps://guest:guest@localhost:5671/",
4646
ssl_context=SslConfigurationContext(
4747
ca_cert=ca_cert_file,
4848
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
4949
),
5050
)
51+
connection = environment.connection()
5152
connection.dial()
5253
try:
5354
yield connection
@@ -58,8 +59,8 @@ def connection_ssl(pytestconfig):
5859

5960
@pytest.fixture()
6061
def management(pytestconfig):
61-
environment = Environment()
62-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
62+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
63+
connection = environment.connection()
6364
connection.dial()
6465
try:
6566
management = connection.management()
@@ -71,8 +72,8 @@ def management(pytestconfig):
7172

7273
@pytest.fixture()
7374
def consumer(pytestconfig):
74-
environment = Environment()
75-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
75+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
76+
connection = environment.connection()
7677
connection.dial()
7778
try:
7879
queue_name = "test-queue"
@@ -105,7 +106,7 @@ def on_message(self, event: Event):
105106
self.delivery_context.accept(event)
106107
self._received = self._received + 1
107108
if self._received == 1000:
108-
event.connection.close()
109+
# event.connection.close()
109110
raise ConsumerTestException("consumed")
110111

111112

@@ -123,7 +124,7 @@ def on_message(self, event: Event):
123124
self.delivery_context.accept(event)
124125
self._received = self._received + 1
125126
if self._received == 10:
126-
event.connection.close()
127+
# event.connection.close()
127128
raise ConsumerTestException("consumed")
128129

129130

@@ -136,8 +137,8 @@ def __init__(self):
136137
def on_message(self, event: Event):
137138
self._received = self._received + 1
138139
if self._received == 1000:
139-
event.receiver.close()
140-
event.connection.close()
140+
# event.receiver.close()
141+
# event.connection.close()
141142
# Workaround to terminate the Consumer and notify the test when all messages are consumed
142143
raise ConsumerTestException("consumed")
143144

@@ -152,7 +153,7 @@ def on_message(self, event: Event):
152153
self.delivery_context.discard(event)
153154
self._received = self._received + 1
154155
if self._received == 1000:
155-
event.connection.close()
156+
# event.connection.close()
156157
raise ConsumerTestException("consumed")
157158

158159

@@ -168,7 +169,7 @@ def on_message(self, event: Event):
168169
self.delivery_context.discard_with_annotations(event, annotations)
169170
self._received = self._received + 1
170171
if self._received == 1000:
171-
event.connection.close()
172+
# event.connection.close()
172173
raise ConsumerTestException("consumed")
173174

174175

@@ -182,7 +183,7 @@ def on_message(self, event: Event):
182183
self.delivery_context.requeue(event)
183184
self._received = self._received + 1
184185
if self._received == 1000:
185-
event.connection.close()
186+
# event.connection.close()
186187
raise ConsumerTestException("consumed")
187188

188189

@@ -198,7 +199,7 @@ def on_message(self, event: Event):
198199
self.delivery_context.requeue_with_annotations(event, annotations)
199200
self._received = self._received + 1
200201
if self._received == 1000:
201-
event.connection.close()
202+
# event.connection.close()
202203
raise ConsumerTestException("consumed")
203204

204205

@@ -214,5 +215,5 @@ def on_message(self, event: Event):
214215
self.delivery_context.requeue_with_annotations(event, annotations)
215216
self._received = self._received + 1
216217
if self._received == 1000:
217-
event.connection.close()
218+
# event.connection.close()
218219
raise ConsumerTestException("consumed")

0 commit comments

Comments
 (0)