Skip to content

Commit 25d57b5

Browse files
committed
Merge remote-tracking branch 'origin/main' into certificate-fixes-for-windows
2 parents 44aeba1 + 7f7bf93 commit 25d57b5

25 files changed

+325
-214
lines changed

.ci/ubuntu/gha-setup.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine
10+
readonly rabbitmq_image=rabbitmq:4.1.0-management
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-python-client'

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ __pycache__/
1515
local*
1616
.githooks/
1717
.venv/
18+
.ci/ubuntu/log/*

Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,13 @@ rabbitmq-server:
77
rabbitmq-server-stop:
88
./.ci/ubuntu/gha-setup.sh stop
99

10+
format:
11+
poetry run isort --skip rabbitmq_amqp_python_client/qpid .
12+
poetry run black rabbitmq_amqp_python_client/
13+
poetry run black tests/
14+
poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
16+
test: format
17+
poetry run pytest .
1018
help:
1119
cat Makefile

README.md

+13-143
Original file line numberDiff line numberDiff line change
@@ -1,161 +1,31 @@
1-
# RabbitMQ AMQP 1.0 Python Client
2-
3-
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
4-
5-
# Table of Contents
6-
7-
- [How to Build the project and run the tests](#How-to-Build-the-project-and-run-the-tests)
8-
- [Installation](#Installation)
9-
- [Getting started](#Getting-Started)
10-
* [Creating a connection](#Creating-a-connection)
11-
* [Managing resources](#Managing-resources)
12-
* [Publishing messages](#Publishing-messages)
13-
* [Consuming messages](#Consuming-messages)
14-
* [Support for streams](#support-for-streams)
15-
* [SSL connection](#ssl-connections)
16-
* [Oauth authentication](#oauth-authentication)
17-
* [Managing disconnections](#Managing-disconnections)
18-
19-
20-
## How to Build the project and run the tests
21-
22-
- Start a RabbitMQ 4.x broker
23-
- poetry build: build the source project
24-
- poetry install: resolves and install dependencies
25-
- poetry run pytest: run the tests
26-
27-
## Installation
1+
## RabbitMQ AMQP 1.0 Python Client
2+
This library is meant to be used with RabbitMQ 4.0. Suitable for testing in pre-production environments.
283

294
The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-python-client/):
305
```bash
316
pip install rabbitmq-amqp-python-client
327
```
338

34-
## Getting Started
35-
36-
An example is provided [`here`](./examples/getting_started/getting_started.py) you can run it after starting a RabbitMQ 4.0 broker with:
37-
38-
poetry run python ./examples/getting_started/getting_started.py
39-
40-
Also consider to have a look to the examples documented in the RabbitMQ website:
41-
42-
https://www.rabbitmq.com/client-libraries/amqp-client-libraries
43-
44-
### Creating a connection
45-
46-
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.
47-
48-
For example:
49-
50-
```python
51-
environment = Environment("amqp://guest:guest@localhost:5672/")
52-
connection = environment.connection()
53-
connection.dial()
54-
```
55-
56-
### Managing resources
57-
58-
Once we have a Connection object we can get a Management object in order to submit to the server management operations
59-
(es: declare/delete queues and exchanges, purging queues, binding/unbinding objects ecc...)
60-
61-
For example (this code is declaring an exchange and a queue:
62-
63-
```python
64-
management = connection.management()
65-
66-
print("declaring exchange and queue")
67-
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
68-
69-
management.declare_queue(
70-
QuorumQueueSpecification(name=queue_name)
71-
)
72-
```
73-
74-
### Publishing messages
75-
76-
Once we have a Connection object we can get a Publisher object in order to send messages to the server (to an exchange or queue)
77-
78-
For example:
79-
80-
```python
81-
addr_queue = AddressHelper.queue_address(queue_name)
82-
publisher = connection.publisher(addr)
83-
84-
# publish messages
85-
for i in range(messages_to_publish):
86-
publisher.publish(Message(body="test"))
87-
88-
publisher.close()
89-
```
90-
91-
### Consuming messages
92-
93-
Once we have a Connection object we can get a Consumer object in order to consumer messages from the server (queue).
94-
95-
Messages are received through a callback
96-
97-
For example:
98-
99-
Create a class which extends AMQPMessagingHandler which defines at minimum the on_consumer method, that will receive the
100-
messages consumed:
101-
102-
```python
103-
class MyMessageHandler(AMQPMessagingHandler):
104-
105-
def __init__(self):
106-
super().__init__()
107-
self._count = 0
108-
109-
def on_message(self, event: Event):
110-
print("received message: " + str(event.message.body))
111-
112-
# accepting
113-
self.delivery_context.accept(event)
114-
```
115-
116-
Then from connection get a consumer object:
117-
118-
```python
119-
addr_queue = AddressHelper.queue_address(queue_name)
120-
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
121-
122-
try:
123-
consumer.run()
124-
except KeyboardInterrupt:
125-
pass
126-
127-
consumer.close()
128-
```
129-
130-
The consumer will run indefinitively waiting for messages to arrive.
131-
132-
### Support for streams
133-
134-
The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview
135-
136-
You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST).
137-
138-
Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
139-
140-
You can check the [`stream example`](./examples/streams/example_with_streams.py) to see how to work with RabbitMQ streams.
9+
### Getting Started
14110

142-
### SSL connections
11+
Inside the [examples](./examples) folder you can find a set of examples that show how to use the client.
14312

144-
The client supports TLS/SSL connections.
14513

146-
You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to establish a secured connection
14+
### Documentation
14715

148-
### Oauth authentication
16+
[Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) select the python section.
14917

150-
The client supports oauth2 authentication.
15118

152-
You can check the [`oauth2 example`](./examples/oauth/oaut.py) to see how to establish and refresh a connection using an oauth2 token
19+
### Build
15320

154-
### Managing disconnections
21+
- `make rabbitmq-server`: run the RabbitMQ server in a docker container
22+
- `poetry build`: build the source project
23+
- `poetry install`: resolves and install dependencies
24+
- `make test`: run the tests
15525

156-
The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers
26+
Note for MAC users:
27+
- TLS does not work, see: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/64
15728

158-
You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections
15929

16030

16131

create_tag.sh

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#/bin/bash
2+
version=$1
3+
# Regex pattern for the valid strings
4+
regex="^v([0-9]+)\.([0-9]+)\.([0-9]+)(-(alpha|beta|rc)\.([0-9]+))?$"
5+
6+
if [[ $version =~ $regex ]]; then
7+
echo "Creating tag with: " $1
8+
git tag -a -s -u $2 -m "rabbitmq-amqp-python-client $1" $1 && git push && git push --tags
9+
echo "Tag created: " $1
10+
else
11+
echo "Invalid version" $1
12+
fi

examples/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ Client examples
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
66
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7-
- [Oauth](./oauth/oauth.py) - Connection through Oauth token
7+
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token

examples/getting_started/getting_started.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
AddressHelper,
66
AMQPMessagingHandler,
77
Connection,
8+
Converter,
89
Environment,
910
Event,
1011
ExchangeSpecification,
@@ -24,7 +25,11 @@ def __init__(self):
2425
self._count = 0
2526

2627
def on_amqp_message(self, event: Event):
27-
print("received message: " + str(event.message.body))
28+
print(
29+
"received message: {} ".format(
30+
Converter.bytes_to_string(event.message.body)
31+
)
32+
)
2833

2934
# accepting
3035
self.delivery_context.accept(event)
@@ -43,13 +48,11 @@ def on_amqp_message(self, event: Event):
4348
# in case of rejection with annotations added
4449
# self.delivery_context.discard_with_annotations(event)
4550

46-
print("count " + str(self._count))
47-
4851
self._count = self._count + 1
52+
print("count " + str(self._count))
4953

5054
if self._count == MESSAGES_TO_PUBLISH:
51-
print("closing receiver")
52-
# if you want you can add cleanup operations here
55+
print("received all messages")
5356

5457
def on_connection_closed(self, event: Event):
5558
# if you want you can add cleanup operations here
@@ -79,7 +82,6 @@ def create_connection(environment: Environment) -> Connection:
7982

8083

8184
def main() -> None:
82-
8385
exchange_name = "test-exchange"
8486
queue_name = "example-queue"
8587
routing_key = "routing-key"
@@ -122,8 +124,9 @@ def main() -> None:
122124

123125
# publish 10 messages
124126
for i in range(MESSAGES_TO_PUBLISH):
125-
print("publishing")
126-
status = publisher.publish(Message(body="test"))
127+
status = publisher.publish(
128+
Message(body=Converter.string_to_bytes("test message {} ".format(i)))
129+
)
127130
if status.remote_state == OutcomeState.ACCEPTED:
128131
print("message accepted")
129132
elif status.remote_state == OutcomeState.RELEASED:

examples/oauth/oaut.py renamed to examples/oauth/oAuth2.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
AddressHelper,
1111
AMQPMessagingHandler,
1212
Connection,
13+
Converter,
1314
Environment,
1415
Event,
1516
ExchangeSpecification,
@@ -30,7 +31,7 @@ def __init__(self):
3031
self._count = 0
3132

3233
def on_amqp_message(self, event: Event):
33-
print("received message: " + str(event.message.body))
34+
print("received message: " + Converter.bytes_to_string(event.message.body))
3435

3536
# accepting
3637
self.delivery_context.accept(event)
@@ -85,10 +86,9 @@ def create_connection(environment: Environment) -> Connection:
8586

8687

8788
def main() -> None:
88-
89-
exchange_name = "test-exchange"
90-
queue_name = "example-queue"
91-
routing_key = "routing-key"
89+
exchange_name = "oAuth2-test-exchange"
90+
queue_name = "oAuth2-example-queue"
91+
routing_key = "oAuth2-routing-key"
9292

9393
print("connection to amqp server")
9494
oaut_token = token(
@@ -144,14 +144,15 @@ def main() -> None:
144144

145145
# publish 10 messages
146146
for i in range(MESSAGES_TO_PUBLISH):
147-
print("publishing")
148-
status = publisher.publish(Message(body="test"))
147+
status = publisher.publish(
148+
Message(body=Converter.string_to_bytes("test_{}".format(i)))
149+
)
149150
if status.remote_state == OutcomeState.ACCEPTED:
150-
print("message accepted")
151+
print("message: test_{} accepted".format(i))
151152
elif status.remote_state == OutcomeState.RELEASED:
152-
print("message not routed")
153+
print("message: test_{} not routed".format(i))
153154
elif status.remote_state == OutcomeState.REJECTED:
154-
print("message not rejected")
155+
print("message: test_{} rejected".format(i))
155156

156157
publisher.close()
157158

examples/reconnection/reconnection_example.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
AMQPMessagingHandler,
55
Connection,
66
ConnectionClosed,
7+
Converter,
78
Environment,
89
Event,
910
ExchangeSpecification,
@@ -15,7 +16,6 @@
1516
# here we keep track of the objects we need to reconnect
1617
MESSAGES_TO_PUBLISH = 50000
1718

18-
1919
environment = Environment(
2020
uri="amqp://guest:guest@localhost:5672/",
2121
)
@@ -29,7 +29,9 @@ def __init__(self):
2929

3030
def on_message(self, event: Event):
3131
if self._count % 1000 == 0:
32-
print("received 100 message: " + str(event.message.body))
32+
print(
33+
"received 100 message: " + Converter.bytes_to_string(event.message.body)
34+
)
3335

3436
# accepting
3537
self.delivery_context.accept(event)
@@ -79,7 +81,6 @@ def create_connection() -> Connection:
7981

8082

8183
def main() -> None:
82-
8384
exchange_name = "test-exchange"
8485
queue_name = "example-queue"
8586
routing_key = "routing-key"
@@ -128,7 +129,7 @@ def main() -> None:
128129
print("published 1000 messages...")
129130
try:
130131
if publisher is not None:
131-
publisher.publish(Message(body="test"))
132+
publisher.publish(Message(body=Converter.string_to_bytes("test")))
132133
except ConnectionClosed:
133134
print("publisher closing exception, resubmitting")
134135
# publisher = connection.publisher(addr)

0 commit comments

Comments
 (0)