Skip to content

Commit 0ec35bb

Browse files
committed
test
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4757a78 commit 0ec35bb

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

examples/direct_reply_queue/direct_reply_to.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def main() -> None:
6060
message_handler=MyMessageHandler(),
6161
consumer_options=DirectReplyToConsumerOptions(),
6262
)
63-
addr = consumer.get_queue_address()
63+
addr = consumer.address
6464
print("connecting to address: {}".format(addr))
6565
publisher = create_connection(environment).publisher(addr)
6666

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,57 @@
11
from rabbitmq_amqp_python_client import (
22
Connection,
3+
Converter,
34
DirectReplyToConsumerOptions,
5+
Environment,
6+
OutcomeState,
47
)
8+
from rabbitmq_amqp_python_client.qpid.proton import Message
59

610

711
def test_consumer_create_reply_name(connection: Connection) -> None:
812
consumer = connection.consumer(consumer_options=DirectReplyToConsumerOptions())
913
assert "/queues/amq.rabbitmq.reply-to." in consumer.address
14+
15+
16+
def create_connection(environment: Environment) -> Connection:
17+
connection = environment.connection()
18+
connection.dial()
19+
return connection
20+
21+
22+
def test_direct_reply_to_send_and_receive(environment: Environment) -> None:
23+
"""Test that messages can be published to and consumed from a direct reply-to queue."""
24+
messages_to_send = 10
25+
26+
# Create a consumer using DirectReplyToConsumerOptions
27+
consumer = create_connection(environment).consumer(
28+
consumer_options=DirectReplyToConsumerOptions()
29+
)
30+
31+
# Get the queue address from the consumer
32+
addr = consumer.address
33+
assert addr is not None
34+
assert "/queues/amq.rabbitmq.reply-to." in addr
35+
36+
# Create a new connection and publisher to publish to the reply-to address
37+
publisher = create_connection(environment).publisher(addr)
38+
39+
# Publish messages to the direct reply-to queue
40+
for i in range(messages_to_send):
41+
msg = Message(body=Converter.string_to_bytes("test message {}".format(i)))
42+
status = publisher.publish(msg)
43+
assert status.remote_state == OutcomeState.ACCEPTED
44+
45+
# Consume messages synchronously
46+
consumed = 0
47+
for i in range(messages_to_send):
48+
message = consumer.consume()
49+
if Converter.bytes_to_string(message.body) == "test message {}".format(i):
50+
consumed = consumed + 1
51+
52+
# Clean up
53+
publisher.close()
54+
consumer.close()
55+
56+
# Verify all messages were received
57+
assert consumed == messages_to_send

0 commit comments

Comments
 (0)