Skip to content

Commit a1f3840

Browse files
DanielePalaiaDanielePalaia
and
DanielePalaia
authored
improve examples / updating docs / adding credits to consumer (#31)
* improve examples / updating docs / adding credits to consumer * some renaming * bumping version --------- Co-authored-by: DanielePalaia <daniele985@@gmail.com>
1 parent d08139f commit a1f3840

15 files changed

+302
-85
lines changed

README.md

+42-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
44

5+
# Table of Contents
6+
7+
- [How to Build the project and run the tests](#How-to-Build-the-project-and-run-the-tests)
8+
- [Installation](#Installation)
9+
- [Getting started](#Getting-Started)
10+
* [Creating a connection](#Creating-a-connection)
11+
* [Managing resources](#Managing-resources)
12+
* [Publishing messages](#Publishing-messages)
13+
* [Consuming messages](#Consuming-messages)
14+
* [Support for streams](#support-for-streams)
15+
* [SSL connection](#ssl-connections)
16+
* [Managing disconnections](#Managing-disconnections)
17+
18+
519
## How to Build the project and run the tests
620

721
- Start a RabbitMQ 4.x broker
@@ -18,7 +32,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt
1832

1933
## Getting Started
2034

21-
An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
35+
An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:
2236

2337
poetry run python ./examples/getting_started/main.py
2438

@@ -109,6 +123,33 @@ Then from connection get a consumer object:
109123

110124
The consumer will run indefinitively waiting for messages to arrive.
111125

126+
### Support for streams
127+
128+
The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview
129+
130+
You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST).
131+
132+
Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
133+
134+
You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.
135+
136+
### SSL connections
137+
138+
The client supports TLS/SSL connections.
139+
140+
You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection
141+
142+
143+
### Managing disconnections
144+
145+
At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
146+
You can use this callback to implement your own logic and eventually attempt a reconnection.
147+
148+
You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
149+
eventually attempt a reconnection
150+
151+
152+
112153

113154

114155

examples/getting_started/basic_example.py

+9-18
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
AMQPMessagingHandler,
77
BindingSpecification,
88
Connection,
9+
Disposition,
910
Event,
1011
ExchangeSpecification,
1112
Message,
1213
QuorumQueueSpecification,
1314
)
1415

15-
messages_to_publish = 100
16+
MESSAGES_TO_PUBLISH = 100
1617

1718

1819
class MyMessageHandler(AMQPMessagingHandler):
@@ -45,7 +46,7 @@ def on_message(self, event: Event):
4546

4647
self._count = self._count + 1
4748

48-
if self._count == messages_to_publish:
49+
if self._count == MESSAGES_TO_PUBLISH:
4950
print("closing receiver")
5051
# if you want you can add cleanup operations here
5152
# event.receiver.close()
@@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None:
6263

6364
def create_connection() -> Connection:
6465
connection = Connection("amqp://guest:guest@localhost:5672/")
65-
# in case of SSL enablement
66-
# ca_cert_file = ".ci/certs/ca_certificate.pem"
67-
# client_cert = ".ci/certs/client_certificate.pem"
68-
# client_key = ".ci/certs/client_key.pem"
69-
# connection = Connection(
70-
# "amqps://guest:guest@localhost:5671/",
71-
# ssl_context=SslConfigurationContext(
72-
# ca_cert=ca_cert_file,
73-
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
74-
# ),
75-
# )
7666
connection.dial()
7767

7868
return connection
@@ -120,21 +110,22 @@ def main() -> None:
120110
# management.close()
121111

122112
# publish 10 messages
123-
for i in range(messages_to_publish):
113+
for i in range(MESSAGES_TO_PUBLISH):
114+
print("publishing")
124115
status = publisher.publish(Message(body="test"))
125-
if status.ACCEPTED:
116+
if status.remote_state == Disposition.ACCEPTED:
126117
print("message accepted")
127-
elif status.RELEASED:
118+
elif status.remote_state == Disposition.RELEASED:
128119
print("message not routed")
129-
elif status.REJECTED:
120+
elif status.remote_state == Disposition.REJECTED:
130121
print("message not rejected")
131122

132123
publisher.close()
133124

134125
print(
135126
"create a consumer and consume the test message - press control + c to terminate to consume"
136127
)
137-
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
128+
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())
138129

139130
try:
140131
consumer.run()

examples/getting_started/example_with_streams.py

+22-16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
StreamSpecification,
1212
)
1313

14+
MESSAGES_TO_PUBLISH = 100
15+
1416

1517
class MyMessageHandler(AMQPMessagingHandler):
1618

@@ -19,6 +21,7 @@ def __init__(self):
1921
self._count = 0
2022

2123
def on_message(self, event: Event):
24+
# just messages with banana filters get received
2225
print(
2326
"received message from stream: "
2427
+ str(event.message.body)
@@ -47,7 +50,7 @@ def on_message(self, event: Event):
4750

4851
self._count = self._count + 1
4952

50-
if self._count == 100:
53+
if self._count == MESSAGES_TO_PUBLISH:
5154
print("closing receiver")
5255
# if you want you can add cleanup operations here
5356
# event.receiver.close()
@@ -64,25 +67,13 @@ def on_link_closed(self, event: Event) -> None:
6467

6568
def create_connection() -> Connection:
6669
connection = Connection("amqp://guest:guest@localhost:5672/")
67-
# in case of SSL enablement
68-
# ca_cert_file = ".ci/certs/ca_certificate.pem"
69-
# client_cert = ".ci/certs/client_certificate.pem"
70-
# client_key = ".ci/certs/client_key.pem"
71-
# connection = Connection(
72-
# "amqps://guest:guest@localhost:5671/",
73-
# ssl_context=SslConfigurationContext(
74-
# ca_cert=ca_cert_file,
75-
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76-
# ),
77-
# )
7870
connection.dial()
7971

8072
return connection
8173

8274

8375
def main() -> None:
8476
queue_name = "example-queue"
85-
messages_to_publish = 100
8677

8778
print("connection to amqp server")
8879
connection = create_connection()
@@ -99,10 +90,11 @@ def main() -> None:
9990
# can be first, last, next or an offset long
10091
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
10192
stream_filter_options.offset(OffsetSpecification.first)
93+
stream_filter_options.filter_values(["banana"])
10294

10395
consumer = consumer_connection.consumer(
10496
addr_queue,
105-
handler=MyMessageHandler(),
97+
message_handler=MyMessageHandler(),
10698
stream_filter_options=stream_filter_options,
10799
)
108100
print(
@@ -112,8 +104,22 @@ def main() -> None:
112104
# print("create a publisher and publish a test message")
113105
publisher = connection.publisher(addr_queue)
114106

115-
for i in range(messages_to_publish):
116-
publisher.publish(Message(body="test: " + str(i)))
107+
# publish with a filter of apple
108+
for i in range(MESSAGES_TO_PUBLISH):
109+
publisher.publish(
110+
Message(
111+
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
112+
)
113+
)
114+
115+
# publish with a filter of banana
116+
for i in range(MESSAGES_TO_PUBLISH):
117+
publisher.publish(
118+
Message(
119+
body="banana: " + str(i),
120+
annotations={"x-stream-filter-value": "banana"},
121+
)
122+
)
117123

118124
publisher.close()
119125

examples/getting_started/reconnection_example.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ConnectionConfiguration:
3131

3232

3333
connection_configuration = ConnectionConfiguration()
34-
messages_to_publish = 50000
34+
MESSAGES_TO_PUBLSH = 50000
3535

3636

3737
# disconnection callback
@@ -61,7 +61,7 @@ def on_disconnection():
6161
if connection_configuration.consumer is not None:
6262
connection_configuration.consumer = (
6363
connection_configuration.connection.consumer(
64-
addr_queue, handler=MyMessageHandler()
64+
addr_queue, message_handler=MyMessageHandler()
6565
)
6666
)
6767

@@ -95,7 +95,7 @@ def on_message(self, event: Event):
9595

9696
self._count = self._count + 1
9797

98-
if self._count == messages_to_publish:
98+
if self._count == MESSAGES_TO_PUBLSH:
9999
print("closing receiver")
100100
# if you want you can add cleanup operations here
101101
# event.receiver.close()
@@ -111,15 +111,15 @@ def on_link_closed(self, event: Event) -> None:
111111

112112

113113
def create_connection() -> Connection:
114-
# for multinode specify a list of urls and fill the field urls of Connection instead of url
115-
# urls = [
114+
# for multinode specify a list of urls and fill the field uris of Connection instead of url
115+
# uris = [
116116
# "amqp://ha_tls-rabbit_node0-1:5682/",
117117
# "amqp://ha_tls-rabbit_node1-1:5692/",
118118
# "amqp://ha_tls-rabbit_node2-1:5602/",
119119
# ]
120-
# connection = Connection(urls=urls, on_disconnection_handler=on_disconnected)
120+
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
121121
connection = Connection(
122-
url="amqp://guest:guest@localhost:5672/",
122+
uri="amqp://guest:guest@localhost:5672/",
123123
on_disconnection_handler=on_disconnection,
124124
)
125125
connection.dial()
@@ -181,7 +181,7 @@ def main() -> None:
181181

182182
# publishing messages
183183
while True:
184-
for i in range(messages_to_publish):
184+
for i in range(MESSAGES_TO_PUBLSH):
185185

186186
if i % 1000 == 0:
187187
print("published 1000 messages...")
@@ -207,7 +207,7 @@ def main() -> None:
207207
if connection_configuration.consumer is None:
208208
connection_configuration.consumer = (
209209
connection_configuration.connection.consumer(
210-
addr_queue, handler=MyMessageHandler()
210+
addr_queue, message_handler=MyMessageHandler()
211211
)
212212
)
213213

0 commit comments

Comments
 (0)