Skip to content

Commit 636df7e

Browse files
authored
Use Data Section by Default instead of AMQPValue for the message creation (#63)
- Update RabbitMQ to 4.1 for ci - Breaking change: Change the default section body for message creation. It uses Data Section (0x75) instead of AmqpValue (0x77). - The Data Section is needed for cross-protocol applications. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 3dd6f51 commit 636df7e

24 files changed

+303
-72
lines changed

.ci/ubuntu/gha-setup.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine
10+
readonly rabbitmq_image=rabbitmq:4.1.0-management
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-python-client'

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ __pycache__/
1515
local*
1616
.githooks/
1717
.venv/
18+
.ci/ubuntu/log/*

Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,13 @@ rabbitmq-server:
77
rabbitmq-server-stop:
88
./.ci/ubuntu/gha-setup.sh stop
99

10+
format:
11+
poetry run isort --skip rabbitmq_amqp_python_client/qpid .
12+
poetry run black rabbitmq_amqp_python_client/
13+
poetry run black tests/
14+
poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
16+
test: format
17+
poetry run pytest .
1018
help:
1119
cat Makefile

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from rabbitmq_amqp_python_client import Converter
2+
13
# RabbitMQ AMQP 1.0 Python Client
24

35
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
@@ -83,7 +85,7 @@ For example:
8385

8486
# publish messages
8587
for i in range(messages_to_publish):
86-
publisher.publish(Message(body="test"))
88+
publisher.publish(Message(body=Converter.string_to_bytes("test")))
8789

8890
publisher.close()
8991
```
@@ -149,7 +151,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e
149151

150152
The client supports oauth2 authentication.
151153

152-
You can check the [`oauth2 example`](./examples/oauth/oaut.py) to see how to establish and refresh a connection using an oauth2 token
154+
You can check the [`oauth2 example`](examples/oauth/oAuth2.py) to see how to establish and refresh a connection using an oauth2 token
153155

154156
### Managing disconnections
155157

examples/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ Client examples
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
66
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7-
- [Oauth](./oauth/oauth.py) - Connection through Oauth token
7+
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token

examples/getting_started/getting_started.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
AddressHelper,
66
AMQPMessagingHandler,
77
Connection,
8+
Converter,
89
Environment,
910
Event,
1011
ExchangeSpecification,
@@ -24,7 +25,11 @@ def __init__(self):
2425
self._count = 0
2526

2627
def on_amqp_message(self, event: Event):
27-
print("received message: " + str(event.message.body))
28+
print(
29+
"received message: {} ".format(
30+
Converter.bytes_to_string(event.message.body)
31+
)
32+
)
2833

2934
# accepting
3035
self.delivery_context.accept(event)
@@ -43,13 +48,11 @@ def on_amqp_message(self, event: Event):
4348
# in case of rejection with annotations added
4449
# self.delivery_context.discard_with_annotations(event)
4550

46-
print("count " + str(self._count))
47-
4851
self._count = self._count + 1
52+
print("count " + str(self._count))
4953

5054
if self._count == MESSAGES_TO_PUBLISH:
51-
print("closing receiver")
52-
# if you want you can add cleanup operations here
55+
print("received all messages")
5356

5457
def on_connection_closed(self, event: Event):
5558
# if you want you can add cleanup operations here
@@ -79,7 +82,6 @@ def create_connection(environment: Environment) -> Connection:
7982

8083

8184
def main() -> None:
82-
8385
exchange_name = "test-exchange"
8486
queue_name = "example-queue"
8587
routing_key = "routing-key"
@@ -122,8 +124,9 @@ def main() -> None:
122124

123125
# publish 10 messages
124126
for i in range(MESSAGES_TO_PUBLISH):
125-
print("publishing")
126-
status = publisher.publish(Message(body="test"))
127+
status = publisher.publish(
128+
Message(body=Converter.string_to_bytes("test message {} ".format(i)))
129+
)
127130
if status.remote_state == OutcomeState.ACCEPTED:
128131
print("message accepted")
129132
elif status.remote_state == OutcomeState.RELEASED:

examples/oauth/oaut.py renamed to examples/oauth/oAuth2.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
AddressHelper,
1111
AMQPMessagingHandler,
1212
Connection,
13+
Converter,
1314
Environment,
1415
Event,
1516
ExchangeSpecification,
@@ -30,7 +31,7 @@ def __init__(self):
3031
self._count = 0
3132

3233
def on_amqp_message(self, event: Event):
33-
print("received message: " + str(event.message.body))
34+
print("received message: " + Converter.bytes_to_string(event.message.body))
3435

3536
# accepting
3637
self.delivery_context.accept(event)
@@ -85,10 +86,9 @@ def create_connection(environment: Environment) -> Connection:
8586

8687

8788
def main() -> None:
88-
89-
exchange_name = "test-exchange"
90-
queue_name = "example-queue"
91-
routing_key = "routing-key"
89+
exchange_name = "oAuth2-test-exchange"
90+
queue_name = "oAuth2-example-queue"
91+
routing_key = "oAuth2-routing-key"
9292

9393
print("connection to amqp server")
9494
oaut_token = token(
@@ -144,14 +144,15 @@ def main() -> None:
144144

145145
# publish 10 messages
146146
for i in range(MESSAGES_TO_PUBLISH):
147-
print("publishing")
148-
status = publisher.publish(Message(body="test"))
147+
status = publisher.publish(
148+
Message(body=Converter.string_to_bytes("test_{}".format(i)))
149+
)
149150
if status.remote_state == OutcomeState.ACCEPTED:
150-
print("message accepted")
151+
print("message: test_{} accepted".format(i))
151152
elif status.remote_state == OutcomeState.RELEASED:
152-
print("message not routed")
153+
print("message: test_{} not routed".format(i))
153154
elif status.remote_state == OutcomeState.REJECTED:
154-
print("message not rejected")
155+
print("message: test_{} rejected".format(i))
155156

156157
publisher.close()
157158

examples/reconnection/reconnection_example.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
AMQPMessagingHandler,
55
Connection,
66
ConnectionClosed,
7+
Converter,
78
Environment,
89
Event,
910
ExchangeSpecification,
@@ -15,7 +16,6 @@
1516
# here we keep track of the objects we need to reconnect
1617
MESSAGES_TO_PUBLISH = 50000
1718

18-
1919
environment = Environment(
2020
uri="amqp://guest:guest@localhost:5672/",
2121
)
@@ -29,7 +29,9 @@ def __init__(self):
2929

3030
def on_message(self, event: Event):
3131
if self._count % 1000 == 0:
32-
print("received 100 message: " + str(event.message.body))
32+
print(
33+
"received 100 message: " + Converter.bytes_to_string(event.message.body)
34+
)
3335

3436
# accepting
3537
self.delivery_context.accept(event)
@@ -79,7 +81,6 @@ def create_connection() -> Connection:
7981

8082

8183
def main() -> None:
82-
8384
exchange_name = "test-exchange"
8485
queue_name = "example-queue"
8586
routing_key = "routing-key"
@@ -128,7 +129,7 @@ def main() -> None:
128129
print("published 1000 messages...")
129130
try:
130131
if publisher is not None:
131-
publisher.publish(Message(body="test"))
132+
publisher.publish(Message(body=Converter.string_to_bytes("test")))
132133
except ConnectionClosed:
133134
print("publisher closing exception, resubmitting")
134135
# publisher = connection.publisher(addr)

examples/streams/example_with_streams.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
AMQPMessagingHandler,
66
Connection,
77
ConnectionClosed,
8+
Converter,
89
Environment,
910
Event,
1011
Message,
@@ -26,7 +27,7 @@ def on_amqp_message(self, event: Event):
2627
# just messages with banana filters get received
2728
print(
2829
"received message from stream: "
29-
+ str(event.message.body)
30+
+ Converter.bytes_to_string(event.message.body)
3031
+ " with offset: "
3132
+ str(event.message.annotations["x-stream-offset"])
3233
)
@@ -84,7 +85,7 @@ def create_connection(environment: Environment) -> Connection:
8485

8586

8687
def main() -> None:
87-
queue_name = "example-queue"
88+
queue_name = "stream-example-queue"
8889

8990
print("connection to amqp server")
9091
environment = Environment("amqp://guest:guest@localhost:5672/")
@@ -118,15 +119,16 @@ def main() -> None:
118119
for i in range(MESSAGES_TO_PUBLISH):
119120
publisher.publish(
120121
Message(
121-
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
122+
Converter.string_to_bytes(body="apple: " + str(i)),
123+
annotations={"x-stream-filter-value": "apple"},
122124
)
123125
)
124126

125127
# publish with a filter of banana
126128
for i in range(MESSAGES_TO_PUBLISH):
127129
publisher.publish(
128130
Message(
129-
body="banana: " + str(i),
131+
body=Converter.string_to_bytes("banana: " + str(i)),
130132
annotations={"x-stream-filter-value": "banana"},
131133
)
132134
)

examples/tls/tls_example.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# type: ignore
2+
import os
23
import sys
34
from traceback import print_exception
45

56
from rabbitmq_amqp_python_client import (
67
AddressHelper,
78
AMQPMessagingHandler,
89
Connection,
10+
Converter,
911
CurrentUserStore,
1012
Environment,
1113
Event,
@@ -34,7 +36,7 @@ def __init__(self):
3436
self._count = 0
3537

3638
def on_message(self, event: Event):
37-
print("received message: " + str(event.message.body))
39+
print("received message: " + Converter.bytes_to_string(event.message.body))
3840

3941
# accepting
4042
self.delivery_context.accept(event)
@@ -79,15 +81,14 @@ def create_connection(environment: Environment) -> Connection:
7981

8082

8183
def main() -> None:
82-
83-
exchange_name = "test-exchange"
84-
queue_name = "example-queue"
85-
routing_key = "routing-key"
84+
exchange_name = "tls-test-exchange"
85+
queue_name = "tls-example-queue"
86+
routing_key = "tls-routing-key"
8687
ca_p12_store = ".ci/certs/ca.p12"
8788
ca_cert_file = ".ci/certs/ca_certificate.pem"
88-
client_cert = ".ci/certs/client_certificate.pem"
89-
client_key = ".ci/certs/client_key.pem"
90-
client_p12_store = ".ci/certs/client.p12"
89+
client_cert = ".ci/certs/client_localhost_certificate.pem"
90+
client_key = ".ci/certs/client_localhost_key.pem"
91+
client_p12_store = ".ci/certs/client_localhost.p12"
9192
uri = "amqps://guest:guest@localhost:5671/"
9293

9394
if sys.platform == "win32":
@@ -138,6 +139,9 @@ def main() -> None:
138139
"connection failed. working directory should be project root"
139140
)
140141
else:
142+
print(" ca_cert_file exists: {}".format(os.path.isfile(ca_cert_file)))
143+
print(" client_cert exists: {}".format(os.path.isfile(client_cert)))
144+
print(" client_key exists: {}".format(os.path.isfile(client_key)))
141145
environment = Environment(
142146
uri,
143147
ssl_context=PosixSslConfigurationContext(
@@ -187,7 +191,7 @@ def main() -> None:
187191

188192
# publish 10 messages
189193
for i in range(messages_to_publish):
190-
status = publisher.publish(Message(body="test"))
194+
status = publisher.publish(Message(body=Converter.string_to_bytes("test")))
191195
if status.ACCEPTED:
192196
print("message accepted")
193197
elif status.RELEASED:

0 commit comments

Comments
 (0)