Skip to content

Commit 49b2594

Browse files
author
DanielePalaia
committed
move conneciton parameters to environment
1 parent fc81de2 commit 49b2594

File tree

12 files changed

+119
-123
lines changed

12 files changed

+119
-123
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ A connection to the RabbitMQ AMQP 1.0 server can be established using the Enviro
4343
For example:
4444

4545
```python
46-
environment = Environment()
47-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
46+
environment = Environment("amqp://guest:guest@localhost:5672/")
47+
connection = environment.connection()
4848
connection.dial()
4949
```
5050

examples/getting_started/getting_started.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ def on_message(self, event: Event):
5050
if self._count == MESSAGES_TO_PUBLISH:
5151
print("closing receiver")
5252
# if you want you can add cleanup operations here
53-
# event.receiver.close()
54-
# event.connection.close()
5553

5654
def on_connection_closed(self, event: Event):
5755
# if you want you can add cleanup operations here
@@ -63,7 +61,7 @@ def on_link_closed(self, event: Event) -> None:
6361

6462

6563
def create_connection(environment: Environment) -> Connection:
66-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
64+
connection = environment.connection()
6765
# in case of SSL enablement
6866
# ca_cert_file = ".ci/certs/ca_certificate.pem"
6967
# client_cert = ".ci/certs/client_certificate.pem"
@@ -87,7 +85,7 @@ def main() -> None:
8785
routing_key = "routing-key"
8886

8987
print("connection to amqp server")
90-
environment = Environment()
88+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
9189
connection = create_connection(environment)
9290

9391
management = connection.management()

examples/reconnection/reconnection_example.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
QuorumQueueSpecification,
2222
)
2323

24-
environment = Environment()
25-
2624

2725
# here we keep track of the objects we need to reconnect
2826
@dataclass
@@ -42,6 +40,7 @@ class ConnectionConfiguration:
4240
def on_disconnection():
4341

4442
print("disconnected")
43+
global environment
4544
exchange_name = "test-exchange"
4645
queue_name = "example-queue"
4746
routing_key = "routing-key"
@@ -69,6 +68,11 @@ def on_disconnection():
6968
)
7069

7170

71+
environment = Environment(
72+
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
73+
)
74+
75+
7276
class MyMessageHandler(AMQPMessagingHandler):
7377

7478
def __init__(self):
@@ -101,8 +105,6 @@ def on_message(self, event: Event):
101105
if self._count == MESSAGES_TO_PUBLSH:
102106
print("closing receiver")
103107
# if you want you can add cleanup operations here
104-
# event.receiver.close()
105-
# event.connection.close()
106108

107109
def on_connection_closed(self, event: Event):
108110
# if you want you can add cleanup operations here
@@ -122,10 +124,7 @@ def create_connection() -> Connection:
122124
# ]
123125
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
124126

125-
connection = environment.connection(
126-
uri="amqp://guest:guest@localhost:5672/",
127-
on_disconnection_handler=on_disconnection,
128-
)
127+
connection = environment.connection()
129128
connection.dial()
130129

131130
return connection

examples/streams/example_with_streams.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ def on_message(self, event: Event):
5454
if self._count == MESSAGES_TO_PUBLISH:
5555
print("closing receiver")
5656
# if you want you can add cleanup operations here
57-
# event.receiver.close()
58-
# event.connection.close()
5957

6058
def on_connection_closed(self, event: Event):
6159
# if you want you can add cleanup operations here
@@ -67,7 +65,7 @@ def on_link_closed(self, event: Event) -> None:
6765

6866

6967
def create_connection(environment: Environment) -> Connection:
70-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
68+
connection = environment.connection()
7169
# in case of SSL enablement
7270
# ca_cert_file = ".ci/certs/ca_certificate.pem"
7371
# client_cert = ".ci/certs/client_certificate.pem"
@@ -88,7 +86,7 @@ def main() -> None:
8886
queue_name = "example-queue"
8987

9088
print("connection to amqp server")
91-
environment = Environment()
89+
environment = Environment("amqp://guest:guest@localhost:5672/")
9290
connection = create_connection(environment)
9391

9492
management = connection.management()

examples/tls/tls_example.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ def on_message(self, event: Event):
5151
if self._count == messages_to_publish:
5252
print("closing receiver")
5353
# if you want you can add cleanup operations here
54-
# event.receiver.close()
55-
# event.connection.close()
5654

5755
def on_connection_closed(self, event: Event):
5856
# if you want you can add cleanup operations here
@@ -65,16 +63,7 @@ def on_link_closed(self, event: Event) -> None:
6563

6664
def create_connection(environment: Environment) -> Connection:
6765
# in case of SSL enablement
68-
ca_cert_file = ".ci/certs/ca_certificate.pem"
69-
client_cert = ".ci/certs/client_certificate.pem"
70-
client_key = ".ci/certs/client_key.pem"
71-
connection = environment.connection(
72-
"amqps://guest:guest@localhost:5671/",
73-
ssl_context=SslConfigurationContext(
74-
ca_cert=ca_cert_file,
75-
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76-
),
77-
)
66+
connection = environment.connection()
7867
connection.dial()
7968

8069
return connection
@@ -85,7 +74,17 @@ def main() -> None:
8574
exchange_name = "test-exchange"
8675
queue_name = "example-queue"
8776
routing_key = "routing-key"
88-
environment = Environment()
77+
ca_cert_file = ".ci/certs/ca_certificate.pem"
78+
client_cert = ".ci/certs/client_certificate.pem"
79+
client_key = ".ci/certs/client_key.pem"
80+
81+
environment = Environment(
82+
"amqps://guest:guest@localhost:5671/",
83+
ssl_context=SslConfigurationContext(
84+
ca_cert=ca_cert_file,
85+
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
86+
),
87+
)
8988

9089
print("connection to amqp server")
9190
connection = create_connection(environment)

rabbitmq_amqp_python_client/connection.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ def __init__(
4949
Raises:
5050
ValueError: If neither uri nor uris is provided
5151
"""
52+
if uri is not None and uris is not None:
53+
raise ValueError(
54+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
55+
)
5256
if uri is None and uris is None:
53-
raise ValueError("You need to specify at least an addr or a list of addr")
57+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
5458
self._addr: Optional[str] = uri
5559
self._addrs: Optional[list[str]] = uris
5660
self._conn: BlockingConnection
@@ -117,8 +121,15 @@ def close(self) -> None:
117121
Closes the underlying connection and removes it from the connection list.
118122
"""
119123
logger.debug("Closing connection")
120-
self._conn.close()
121-
self._connections.remove(self)
124+
try:
125+
self._conn.close()
126+
except Exception as e:
127+
logger.error(f"Error closing connection: {e}")
128+
raise e
129+
130+
finally:
131+
if self in self._connections:
132+
self._connections.remove(self)
122133

123134
def publisher(self, destination: str = "") -> Publisher:
124135
"""

rabbitmq_amqp_python_client/environment.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,46 +23,58 @@ class Environment:
2323
_connections (list[Connection]): List of active connections managed by this environment
2424
"""
2525

26-
def __init__(self): # type: ignore
26+
def __init__(
27+
self, # single-node mode
28+
uri: Optional[str] = None,
29+
# multi-node mode
30+
uris: Optional[list[str]] = None,
31+
ssl_context: Optional[SslConfigurationContext] = None,
32+
on_disconnection_handler: Optional[CB] = None, # type: ignore
33+
):
2734
"""
2835
Initialize a new Environment instance.
2936
3037
Creates an empty list to track active connections.
38+
39+
Args:
40+
uri: Single node connection URI
41+
uris: List of URIs for multi-node setup
42+
ssl_context: SSL configuration for secure connections
43+
on_disconnection_handler: Callback for handling disconnection events
44+
3145
"""
46+
if uri is not None and uris is not None:
47+
raise ValueError(
48+
"Cannot specify both 'uri' and 'uris'. Choose one connection mode."
49+
)
50+
if uri is None and uris is None:
51+
raise ValueError("Must specify either 'uri' or 'uris' for connection.")
52+
self._uri = uri
53+
self._uris = uris
54+
self._ssl_context = ssl_context
55+
self._on_disconnection_handler = on_disconnection_handler
3256
self._connections: list[Connection] = []
3357

3458
def connection(
3559
self,
36-
# single-node mode
37-
uri: Optional[str] = None,
38-
# multi-node mode
39-
uris: Optional[list[str]] = None,
40-
ssl_context: Optional[SslConfigurationContext] = None,
41-
on_disconnection_handler: Optional[CB] = None, # type: ignore
4260
) -> Connection:
4361
"""
4462
Create and return a new connection.
4563
4664
This method supports both single-node and multi-node configurations, with optional
4765
SSL/TLS security and disconnection handling.
4866
49-
Args:
50-
uri: Single node connection URI
51-
uris: List of URIs for multi-node setup
52-
ssl_context: SSL configuration for secure connections
53-
on_disconnection_handler: Callback for handling disconnection events
54-
5567
Returns:
5668
Connection: A new connection instance
5769
5870
Raises:
5971
ValueError: If neither uri nor uris is provided
6072
"""
6173
connection = Connection(
62-
uri=uri,
63-
uris=uris,
64-
ssl_context=ssl_context,
65-
on_disconnection_handler=on_disconnection_handler,
74+
uri=self._uri,
75+
uris=self._uris,
76+
ssl_context=self._ssl_context,
77+
on_disconnection_handler=self._on_disconnection_handler,
6678
)
6779
logger.debug("Environment: Creating and returning a new connection")
6880
self._connections.append(connection)

tests/conftest.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
@pytest.fixture()
1717
def environment(pytestconfig):
18-
environment = Environment()
18+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
1919
try:
2020
yield environment
2121

@@ -25,8 +25,8 @@ def environment(pytestconfig):
2525

2626
@pytest.fixture()
2727
def connection(pytestconfig):
28-
environment = Environment()
29-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
28+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
29+
connection = environment.connection()
3030
connection.dial()
3131
try:
3232
yield connection
@@ -37,17 +37,18 @@ def connection(pytestconfig):
3737

3838
@pytest.fixture()
3939
def connection_ssl(pytestconfig):
40-
environment = Environment()
4140
ca_cert_file = ".ci/certs/ca_certificate.pem"
4241
client_cert = ".ci/certs/client_certificate.pem"
4342
client_key = ".ci/certs/client_key.pem"
44-
connection = environment.connection(
43+
44+
environment = Environment(
4545
"amqps://guest:guest@localhost:5671/",
4646
ssl_context=SslConfigurationContext(
4747
ca_cert=ca_cert_file,
4848
client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
4949
),
5050
)
51+
connection = environment.connection()
5152
connection.dial()
5253
try:
5354
yield connection
@@ -58,8 +59,8 @@ def connection_ssl(pytestconfig):
5859

5960
@pytest.fixture()
6061
def management(pytestconfig):
61-
environment = Environment()
62-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
62+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
63+
connection = environment.connection()
6364
connection.dial()
6465
try:
6566
management = connection.management()
@@ -71,8 +72,8 @@ def management(pytestconfig):
7172

7273
@pytest.fixture()
7374
def consumer(pytestconfig):
74-
environment = Environment()
75-
connection = environment.connection("amqp://guest:guest@localhost:5672/")
75+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
76+
connection = environment.connection()
7677
connection.dial()
7778
try:
7879
queue_name = "test-queue"
@@ -105,7 +106,6 @@ def on_message(self, event: Event):
105106
self.delivery_context.accept(event)
106107
self._received = self._received + 1
107108
if self._received == 1000:
108-
event.connection.close()
109109
raise ConsumerTestException("consumed")
110110

111111

@@ -123,7 +123,6 @@ def on_message(self, event: Event):
123123
self.delivery_context.accept(event)
124124
self._received = self._received + 1
125125
if self._received == 10:
126-
event.connection.close()
127126
raise ConsumerTestException("consumed")
128127

129128

@@ -136,8 +135,6 @@ def __init__(self):
136135
def on_message(self, event: Event):
137136
self._received = self._received + 1
138137
if self._received == 1000:
139-
event.receiver.close()
140-
event.connection.close()
141138
# Workaround to terminate the Consumer and notify the test when all messages are consumed
142139
raise ConsumerTestException("consumed")
143140

@@ -152,7 +149,6 @@ def on_message(self, event: Event):
152149
self.delivery_context.discard(event)
153150
self._received = self._received + 1
154151
if self._received == 1000:
155-
event.connection.close()
156152
raise ConsumerTestException("consumed")
157153

158154

@@ -168,7 +164,6 @@ def on_message(self, event: Event):
168164
self.delivery_context.discard_with_annotations(event, annotations)
169165
self._received = self._received + 1
170166
if self._received == 1000:
171-
event.connection.close()
172167
raise ConsumerTestException("consumed")
173168

174169

@@ -182,7 +177,7 @@ def on_message(self, event: Event):
182177
self.delivery_context.requeue(event)
183178
self._received = self._received + 1
184179
if self._received == 1000:
185-
event.connection.close()
180+
# event.connection.close()
186181
raise ConsumerTestException("consumed")
187182

188183

@@ -198,7 +193,6 @@ def on_message(self, event: Event):
198193
self.delivery_context.requeue_with_annotations(event, annotations)
199194
self._received = self._received + 1
200195
if self._received == 1000:
201-
event.connection.close()
202196
raise ConsumerTestException("consumed")
203197

204198

@@ -214,5 +208,4 @@ def on_message(self, event: Event):
214208
self.delivery_context.requeue_with_annotations(event, annotations)
215209
self._received = self._received + 1
216210
if self._received == 1000:
217-
event.connection.close()
218211
raise ConsumerTestException("consumed")

0 commit comments

Comments
 (0)