Skip to content

Commit d84ad73

Browse files
committed
add amqp 091 test
1 parent 11ca5d9 commit d84ad73

File tree

4 files changed

+133
-5
lines changed

4 files changed

+133
-5
lines changed

Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,13 @@ rabbitmq-server:
77
rabbitmq-server-stop:
88
./.ci/ubuntu/gha-setup.sh stop
99

10+
format:
11+
poetry run isort --skip rabbitmq_amqp_python_client/qpid .
12+
poetry run black rabbitmq_amqp_python_client/
13+
poetry run black tests/
14+
poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
16+
test: format
17+
poetry run pytest .
1018
help:
1119
cat Makefile

poetry.lock

+53-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ readme = "README.md"
99
[tool.poetry.dependencies]
1010
python = "^3.9"
1111
python-qpid-proton = "^0.39.0"
12-
pyjwt = "^2.10.1"
1312
typing-extensions = "^4.13.0"
1413

14+
[tool.poetry.group.test.dependencies]
15+
pyjwt = "^2.10.1"
16+
pika = "^1.3.2"
17+
1518
[tool.poetry.group.dev.dependencies]
1619
flake8 = "^7.1.1"
1720
isort = "^5.9.3"

tests/test_amqp_091.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import functools
2+
3+
import pika
4+
5+
from rabbitmq_amqp_python_client import (
6+
AddressHelper,
7+
Connection,
8+
Converter,
9+
OutcomeState,
10+
QuorumQueueSpecification,
11+
)
12+
from rabbitmq_amqp_python_client.qpid.proton import Message
13+
14+
15+
def test_publish_queue(connection: Connection) -> None:
16+
queue_name = "amqp091-queue"
17+
management = connection.management()
18+
19+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
20+
21+
raised = False
22+
23+
publisher = None
24+
accepted = False
25+
26+
try:
27+
publisher = connection.publisher(
28+
destination=AddressHelper.queue_address(queue_name)
29+
)
30+
status = publisher.publish(
31+
Message(body=Converter.string_to_bytes("my_test_string_for_amqp"))
32+
)
33+
if status.remote_state == OutcomeState.ACCEPTED:
34+
accepted = True
35+
except Exception:
36+
raised = True
37+
38+
if publisher is not None:
39+
publisher.close()
40+
41+
assert accepted is True
42+
assert raised is False
43+
44+
credentials = pika.PlainCredentials("guest", "guest")
45+
parameters = pika.ConnectionParameters("localhost", credentials=credentials)
46+
connection = pika.BlockingConnection(parameters)
47+
channel = connection.channel()
48+
49+
def on_message(chan, method_frame, header_frame, body, userdata=None):
50+
"""Called when a message is received. Log message and ack it."""
51+
chan.basic_ack(delivery_tag=method_frame.delivery_tag)
52+
assert body is not None
53+
body_text = Converter.bytes_to_string(body)
54+
assert body_text is not None
55+
assert body_text == "my_test_string_for_amqp"
56+
channel.stop_consuming()
57+
58+
on_message_callback = functools.partial(on_message, userdata="on_message_userdata")
59+
channel.basic_qos(
60+
prefetch_count=1,
61+
)
62+
channel.basic_consume(queue_name, on_message_callback)
63+
64+
channel.start_consuming()
65+
connection.close()
66+
67+
management.delete_queue(queue_name)
68+
management.close()

0 commit comments

Comments
 (0)