Skip to content

Commit eac025a

Browse files
DanielePalaiaDanielePalaia
and
DanielePalaia
authored
Environment and improvements for doc compatibility (#33)
* Introducing Environment * introducing environment class * adding closing connections logic * replace Disposition to OutcomeState * making exchange arguments optional * adding message/consumer count in QueueInfo * moving examples and making close() method of connection public again * bumping version --------- Co-authored-by: DanielePalaia <daniele985@@gmail.com>
1 parent 9753329 commit eac025a

20 files changed

+301
-98
lines changed

README.md

+8-7
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,19 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt
3232

3333
## Getting Started
3434

35-
An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:
35+
An example is provided [`here`](./examples/getting_started/getting_started.py) you can run it after starting a RabbitMQ 4.0 broker with:
3636

37-
poetry run python ./examples/getting_started/main.py
37+
poetry run python ./examples/getting_started/getting_started.py
3838

3939
### Creating a connection
4040

41-
A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
41+
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.
4242

4343
For example:
4444

4545
```python
46-
connection = Connection("amqp://guest:guest@localhost:5672/")
46+
environment = Environment()
47+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
4748
connection.dial()
4849
```
4950

@@ -131,21 +132,21 @@ You can consume from a given offset or specify a default starting point (FIRST,
131132

132133
Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
133134

134-
You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.
135+
You can check the [`stream example`](./examples/streams/example_with_streams.py) to see how to work with RabbitMQ streams.
135136

136137
### SSL connections
137138

138139
The client supports TLS/SSL connections.
139140

140-
You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection
141+
You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to establish a secured connection
141142

142143

143144
### Managing disconnections
144145

145146
At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
146147
You can use this callback to implement your own logic and eventually attempt a reconnection.
147148

148-
You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
149+
You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and
149150
eventually attempt a reconnection
150151

151152

examples/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Client examples
2+
===
3+
- [Getting started](./getting_started/getting_started.py) - Producer and Consumer example without reconnection
4+
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
5+
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
6+
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities

examples/getting_started/basic_example.py renamed to examples/getting_started/getting_started.py

+22-9
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
AMQPMessagingHandler,
77
BindingSpecification,
88
Connection,
9-
Disposition,
9+
Environment,
1010
Event,
1111
ExchangeSpecification,
1212
Message,
13+
OutcomeState,
1314
QuorumQueueSpecification,
1415
)
1516

@@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None:
6162
print("link closed")
6263

6364

64-
def create_connection() -> Connection:
65-
connection = Connection("amqp://guest:guest@localhost:5672/")
65+
def create_connection(environment: Environment) -> Connection:
66+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
67+
# in case of SSL enablement
68+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
69+
# client_cert = ".ci/certs/client_certificate.pem"
70+
# client_key = ".ci/certs/client_key.pem"
71+
# connection = Connection(
72+
# "amqps://guest:guest@localhost:5671/",
73+
# ssl_context=SslConfigurationContext(
74+
# ca_cert=ca_cert_file,
75+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76+
# ),
77+
# )
6678
connection.dial()
6779

6880
return connection
@@ -75,12 +87,13 @@ def main() -> None:
7587
routing_key = "routing-key"
7688

7789
print("connection to amqp server")
78-
connection = create_connection()
90+
environment = Environment()
91+
connection = create_connection(environment)
7992

8093
management = connection.management()
8194

8295
print("declaring exchange and queue")
83-
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
96+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
8497

8598
management.declare_queue(
8699
QuorumQueueSpecification(name=queue_name)
@@ -113,11 +126,11 @@ def main() -> None:
113126
for i in range(MESSAGES_TO_PUBLISH):
114127
print("publishing")
115128
status = publisher.publish(Message(body="test"))
116-
if status.remote_state == Disposition.ACCEPTED:
129+
if status.remote_state == OutcomeState.ACCEPTED:
117130
print("message accepted")
118-
elif status.remote_state == Disposition.RELEASED:
131+
elif status.remote_state == OutcomeState.RELEASED:
119132
print("message not routed")
120-
elif status.remote_state == Disposition.REJECTED:
133+
elif status.remote_state == OutcomeState.REJECTED:
121134
print("message not rejected")
122135

123136
publisher.close()
@@ -150,7 +163,7 @@ def main() -> None:
150163
print("closing connections")
151164
management.close()
152165
print("after management closing")
153-
connection.close()
166+
environment.close()
154167
print("after connection closing")
155168

156169

examples/getting_started/reconnection_example.py renamed to examples/reconnection/reconnection_example.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Connection,
1313
ConnectionClosed,
1414
Consumer,
15+
Environment,
1516
Event,
1617
ExchangeSpecification,
1718
Management,
@@ -20,6 +21,8 @@
2021
QuorumQueueSpecification,
2122
)
2223

24+
environment = Environment()
25+
2326

2427
# here we keep track of the objects we need to reconnect
2528
@dataclass
@@ -118,8 +121,9 @@ def create_connection() -> Connection:
118121
# "amqp://ha_tls-rabbit_node2-1:5602/",
119122
# ]
120123
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
121-
connection = Connection(
122-
uri="amqp://guest:guest@localhost:5672/",
124+
125+
connection = environment.connection(
126+
url="amqp://guest:guest@localhost:5672/",
123127
on_disconnection_handler=on_disconnection,
124128
)
125129
connection.dial()
@@ -146,7 +150,7 @@ def main() -> None:
146150

147151
print("declaring exchange and queue")
148152
connection_configuration.management.declare_exchange(
149-
ExchangeSpecification(name=exchange_name, arguments={})
153+
ExchangeSpecification(name=exchange_name)
150154
)
151155

152156
connection_configuration.management.declare_queue(
@@ -242,7 +246,7 @@ def main() -> None:
242246
print("closing connections")
243247
connection_configuration.management.close()
244248
print("after management closing")
245-
connection_configuration.connection.close()
249+
environment.close()
246250
print("after connection closing")
247251

248252

examples/getting_started/example_with_streams.py renamed to examples/streams/example_with_streams.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
AddressHelper,
55
AMQPMessagingHandler,
66
Connection,
7+
Environment,
78
Event,
89
Message,
910
OffsetSpecification,
@@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None:
6566
print("link closed")
6667

6768

68-
def create_connection() -> Connection:
69-
connection = Connection("amqp://guest:guest@localhost:5672/")
69+
def create_connection(environment: Environment) -> Connection:
70+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
71+
# in case of SSL enablement
72+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
73+
# client_cert = ".ci/certs/client_certificate.pem"
74+
# client_key = ".ci/certs/client_key.pem"
75+
# connection = Connection(
76+
# "amqps://guest:guest@localhost:5671/",
77+
# ssl_context=SslConfigurationContext(
78+
# ca_cert=ca_cert_file,
79+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
80+
# ),
81+
# )
7082
connection.dial()
7183

7284
return connection
@@ -76,15 +88,16 @@ def main() -> None:
7688
queue_name = "example-queue"
7789

7890
print("connection to amqp server")
79-
connection = create_connection()
91+
environment = Environment()
92+
connection = create_connection(environment)
8093

8194
management = connection.management()
8295

8396
management.declare_queue(StreamSpecification(name=queue_name))
8497

8598
addr_queue = AddressHelper.queue_address(queue_name)
8699

87-
consumer_connection = create_connection()
100+
consumer_connection = create_connection(environment)
88101

89102
stream_filter_options = StreamOptions()
90103
# can be first, last, next or an offset long
@@ -135,7 +148,7 @@ def main() -> None:
135148
print("closing connections")
136149
management.close()
137150
print("after management closing")
138-
connection.close()
151+
environment.close()
139152
print("after connection closing")
140153

141154

examples/getting_started/tls_example.py renamed to examples/tls/tls_example.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
BindingSpecification,
88
ClientCert,
99
Connection,
10+
Environment,
1011
Event,
1112
ExchangeSpecification,
1213
Message,
@@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None:
6263
print("link closed")
6364

6465

65-
def create_connection() -> Connection:
66+
def create_connection(environment: Environment) -> Connection:
6667
# in case of SSL enablement
6768
ca_cert_file = ".ci/certs/ca_certificate.pem"
6869
client_cert = ".ci/certs/client_certificate.pem"
6970
client_key = ".ci/certs/client_key.pem"
70-
connection = Connection(
71+
connection = environment.connection(
7172
"amqps://guest:guest@localhost:5671/",
7273
ssl_context=SslConfigurationContext(
7374
ca_cert=ca_cert_file,
@@ -84,14 +85,15 @@ def main() -> None:
8485
exchange_name = "test-exchange"
8586
queue_name = "example-queue"
8687
routing_key = "routing-key"
88+
environment = Environment()
8789

8890
print("connection to amqp server")
89-
connection = create_connection()
91+
connection = create_connection(environment)
9092

9193
management = connection.management()
9294

9395
print("declaring exchange and queue")
94-
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
96+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
9597

9698
management.declare_queue(
9799
QuorumQueueSpecification(name=queue_name)
@@ -160,7 +162,7 @@ def main() -> None:
160162
print("closing connections")
161163
management.close()
162164
print("after management closing")
163-
connection.close()
165+
environment.close()
164166
print("after connection closing")
165167

166168

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "rabbitmq-amqp-python-client"
3-
version = "0.1.0-alpha.2"
3+
version = "0.1.0-alpha.3"
44
description = "Python RabbitMQ client for AMQP 1.0 protocol"
55
authors = ["RabbitMQ team"]
66
license = "Apache-2.0 license"

rabbitmq_amqp_python_client/__init__.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
OffsetSpecification,
1212
StreamOptions,
1313
)
14+
from .environment import Environment
1415
from .exceptions import ArgumentOutOfRangeException
1516
from .management import Management
1617
from .publisher import Publisher
@@ -39,6 +40,8 @@
3940

4041
del metadata
4142

43+
OutcomeState = Disposition
44+
4245
__all__ = [
4346
"Connection",
4447
"Management",
@@ -61,9 +64,9 @@
6164
"ArgumentOutOfRangeException",
6265
"SslConfigurationContext",
6366
"ClientCert",
64-
"Delivery",
6567
"ConnectionClosed",
6668
"StreamOptions",
6769
"OffsetSpecification",
68-
"Disposition",
70+
"OutcomeState",
71+
"Environment",
6972
]

rabbitmq_amqp_python_client/connection.py

+6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ def __init__(
3737
self._on_disconnection_handler = on_disconnection_handler
3838
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
3939
self._ssl_domain = None
40+
self._connections = [] # type: ignore
41+
self._index: int = -1
42+
43+
def _set_environment_connection_list(self, connections: []): # type: ignore
44+
self._connections = connections
4045

4146
def dial(self) -> None:
4247
logger.debug("Establishing a connection to the amqp server")
@@ -75,6 +80,7 @@ def management(self) -> Management:
7580
def close(self) -> None:
7681
logger.debug("Closing connection")
7782
self._conn.close()
83+
self._connections.remove(self)
7884

7985
def publisher(self, destination: str) -> Publisher:
8086
if validate_address(destination) is False:

rabbitmq_amqp_python_client/entities.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from dataclasses import dataclass
1+
from dataclasses import dataclass, field
22
from enum import Enum
33
from typing import Any, Dict, Optional, Union
44

@@ -13,7 +13,7 @@
1313
@dataclass
1414
class ExchangeSpecification:
1515
name: str
16-
arguments: dict[str, str]
16+
arguments: dict[str, str] = field(default_factory=dict)
1717
exchange_type: ExchangeType = ExchangeType.direct
1818
is_auto_delete: bool = False
1919
is_internal: bool = False
@@ -24,7 +24,7 @@ class ExchangeSpecification:
2424
class QueueInfo:
2525
name: str
2626
arguments: dict[str, Any]
27-
queue_type: QueueType = QueueType.quorum
27+
queue_type: QueueType = QueueType.classic
2828
is_exclusive: Optional[bool] = None
2929
is_auto_delete: bool = False
3030
is_durable: bool = True

0 commit comments

Comments
 (0)