Skip to content

Commit 2ab1c3a

Browse files
committed
direct reply to
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 33479c2 commit 2ab1c3a

File tree

10 files changed

+199
-45
lines changed

10 files changed

+199
-45
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# type: ignore
2+
3+
4+
from rabbitmq_amqp_python_client import ( # PosixSSlConfigurationContext,; PosixClientCert,
5+
AMQPMessagingHandler,
6+
Connection,
7+
Converter,
8+
DirectReplyToConsumerOptions,
9+
Environment,
10+
Event,
11+
Message,
12+
OutcomeState,
13+
)
14+
15+
MESSAGES_TO_PUBLISH = 200
16+
17+
18+
class MyMessageHandler(AMQPMessagingHandler):
19+
20+
def __init__(self):
21+
super().__init__()
22+
self._count = 0
23+
24+
def on_amqp_message(self, event: Event):
25+
print(
26+
"received message: {} ".format(
27+
Converter.bytes_to_string(event.message.body)
28+
)
29+
)
30+
31+
# accepting
32+
self.delivery_context.accept(event)
33+
34+
self._count = self._count + 1
35+
print("count " + str(self._count))
36+
37+
if self._count == MESSAGES_TO_PUBLISH:
38+
print("received all messages")
39+
40+
def on_connection_closed(self, event: Event):
41+
# if you want you can add cleanup operations here
42+
print("connection closed")
43+
44+
def on_link_closed(self, event: Event) -> None:
45+
# if you want you can add cleanup operations here
46+
print("link closed")
47+
48+
49+
def create_connection(environment: Environment) -> Connection:
50+
connection = environment.connection()
51+
connection.dial()
52+
return connection
53+
54+
55+
def main() -> None:
56+
print("connection to amqp server")
57+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
58+
connection = create_connection(environment)
59+
consumer = connection.consumer(message_handler=MyMessageHandler(),
60+
consumer_options=DirectReplyToConsumerOptions())
61+
addr = consumer.get_queue_address()
62+
print("connecting to address: {}".format(addr))
63+
publisher = create_connection(environment).publisher(addr)
64+
65+
for i in range(MESSAGES_TO_PUBLISH):
66+
msg = Message(
67+
body=Converter.string_to_bytes("test message {} ".format(i)))
68+
status = publisher.publish(msg)
69+
if status.remote_state == OutcomeState.ACCEPTED:
70+
print("message accepted")
71+
elif status.remote_state == OutcomeState.RELEASED:
72+
print("message not routed")
73+
elif status.remote_state == OutcomeState.REJECTED:
74+
print("message not rejected")
75+
76+
try:
77+
consumer.run()
78+
except KeyboardInterrupt:
79+
pass
80+
81+
consumer.close()
82+
83+
connection.close()
84+
print("after connection closing")
85+
86+
87+
if __name__ == "__main__":
88+
main()

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .consumer import Consumer
1515
from .entities import (
1616
ConsumerOptions,
17+
DirectReplyToConsumerOptions,
1718
ExchangeCustomSpecification,
1819
ExchangeSpecification,
1920
ExchangeToExchangeBindingSpecification,
@@ -72,6 +73,7 @@
7273
"QuorumQueueSpecification",
7374
"ClassicQueueSpecification",
7475
"StreamSpecification",
76+
"DirectReplyToConsumerOptions",
7577
"ExchangeToQueueBindingSpecification",
7678
"ExchangeToExchangeBindingSpecification",
7779
"QueueType",

rabbitmq_amqp_python_client/connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .consumer import Consumer
1818
from .entities import (
1919
ConsumerOptions,
20+
DirectReplyToConsumerOptions,
2021
OAuth2Options,
2122
RecoveryConfiguration,
2223
)
@@ -397,9 +398,13 @@ def consumer(
397398
Consumer: A new consumer instance
398399
399400
Raises:
400-
ArgumentOutOfRangeException: If destination address format is invalid
401+
ArgumentOutOfRangeException: If destination address format is invalid.
402+
Only applies if not using Direct Reply-to.
403+
The server will provide the queue name in that case.
401404
"""
402-
if not validate_address(destination):
405+
if not validate_address(destination) and not isinstance(
406+
consumer_options, DirectReplyToConsumerOptions
407+
):
403408
raise ArgumentOutOfRangeException(
404409
"destination address must start with /queues or /exchanges"
405410
)
@@ -438,9 +443,7 @@ def _on_disconnection(self) -> None:
438443
time.sleep(delay.total_seconds())
439444

440445
try:
441-
442446
self._open_connections(reconnect_handlers=True)
443-
444447
self._connections.append(self)
445448

446449
except ConnectionException as e:

rabbitmq_amqp_python_client/consumer.py

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import logging
22
from typing import Literal, Optional, Union, cast
33

4+
from pika.exceptions import AMQPError
5+
46
from .amqp_consumer_handler import AMQPMessagingHandler
5-
from .entities import ConsumerOptions
7+
from .entities import (
8+
ConsumerOptions,
9+
DirectReplyToConsumerOptions,
10+
StreamConsumerOptions,
11+
)
612
from .options import (
713
ReceiverOptionUnsettled,
814
ReceiverOptionUnsettledWithFilters,
@@ -29,18 +35,17 @@ class Consumer:
2935
_conn (BlockingConnection): The underlying connection to RabbitMQ
3036
_addr (str): The address to consume from
3137
_handler (Optional[MessagingHandler]): Optional message handling callback
32-
_stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
38+
_consumer_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
3339
_credit (Optional[int]): Flow control credit value
3440
"""
3541

3642
def __init__(
37-
self,
38-
conn: BlockingConnection,
39-
addr: str,
40-
handler: Optional[AMQPMessagingHandler] = None,
41-
stream_options: Optional[ConsumerOptions] = None,
42-
credit: Optional[int] = None,
43-
direct_reply_to: Optional[bool] = None,
43+
self,
44+
conn: BlockingConnection,
45+
addr: str,
46+
handler: Optional[AMQPMessagingHandler] = None,
47+
consumer_options: Optional[ConsumerOptions] = None,
48+
credit: Optional[int] = None,
4449
):
4550
"""
4651
Initialize a new Consumer instance.
@@ -49,27 +54,38 @@ def __init__(
4954
conn: The blocking connection to use for consuming
5055
addr: The address to consume from
5156
handler: Optional message handler for processing received messages
52-
stream_options: Optional configuration for stream-based consumption
57+
consumer_options: Optional configuration for stream-based consumption
5358
credit: Optional credit value for flow control
5459
"""
5560
self._receiver: Optional[BlockingReceiver] = None
5661
self._conn = conn
5762
self._addr = addr
5863
self._handler = handler
59-
self._stream_options = stream_options
64+
self._consumer_options = consumer_options
6065
self._credit = credit
6166
self._consumers: list[Consumer] = []
62-
self._direct_reply_to = direct_reply_to
6367
self._open()
6468

6569
def _open(self) -> None:
6670
if self._receiver is None:
6771
logger.debug("Creating Receiver")
6872
self._receiver = self._create_receiver(self._addr)
6973

74+
def get_queue_address(self) -> Optional[str]:
75+
"""
76+
Get the name of the queue from the address.
77+
78+
Returns:
79+
str: The name of the queue.
80+
"""
81+
if self._receiver is not None:
82+
return cast(Optional[str], self._receiver.link.remote_source.address)
83+
else:
84+
raise AMQPError("Receiver is not initialized")
85+
7086
def _update_connection(self, conn: BlockingConnection) -> None:
7187
self._conn = conn
72-
if self._stream_options is None:
88+
if self._consumer_options is None:
7389
logger.debug("creating new receiver without stream")
7490
self._receiver = self._conn.create_receiver(
7591
self._addr,
@@ -78,11 +94,11 @@ def _update_connection(self, conn: BlockingConnection) -> None:
7894
)
7995
else:
8096
logger.debug("creating new stream receiver")
81-
self._stream_options.offset(self._handler.offset - 1) # type: ignore
97+
self._consumer_options.offset(self._handler.offset - 1) # type: ignore
8298
self._receiver = self._conn.create_receiver(
8399
self._addr,
84100
options=ReceiverOptionUnsettledWithFilters(
85-
self._addr, self._stream_options
101+
self._addr, self._consumer_options
86102
),
87103
handler=self._handler,
88104
)
@@ -145,34 +161,44 @@ def stop(self) -> None:
145161
self._receiver.container.stop()
146162

147163
def _create_receiver(self, addr: str) -> BlockingReceiver:
148-
logger.debug("Creating the receiver")
149-
if self._direct_reply_to is None:
150-
self._direct_reply_to = True
164+
credit = 100
165+
if self._credit is not None:
166+
credit = self._credit
151167

152-
if self._direct_reply_to:
153-
x = self._conn.create_dynamic_receiver()
154-
# print(x.link.remote_source.address)
155-
return x
168+
if self._consumer_options is not None:
169+
logger.debug(
170+
"Creating the receiver, with options: %s",
171+
type(self._consumer_options).__name__,
172+
)
173+
else:
174+
logger.debug("Creating the receiver, without options")
156175

157-
if self._stream_options is None:
158-
receiver = self._conn.create_receiver(
176+
if self._consumer_options is None:
177+
return self._conn.create_receiver(
159178
addr,
160179
options=ReceiverOptionUnsettled(addr),
161180
handler=self._handler,
162-
dynamic=self._direct_reply_to,
181+
credit=credit,
163182
)
164-
else:
165-
receiver = self._conn.create_receiver(
183+
184+
if isinstance(self._consumer_options, DirectReplyToConsumerOptions):
185+
print("Creating dynamic receiver for direct reply-to")
186+
x = self._conn.create_dynamic_receiver(100, handler=self._handler)
187+
x.credit = credit
188+
return x
189+
190+
if isinstance(self._consumer_options, StreamConsumerOptions):
191+
return self._conn.create_receiver(
166192
addr,
167-
options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options),
193+
options=ReceiverOptionUnsettledWithFilters(
194+
addr, self._consumer_options
195+
),
168196
handler=self._handler,
169-
dynamic=self._direct_reply_to,
170197
)
171198

172-
if self._credit is not None:
173-
receiver.credit = self._credit
174-
175-
return receiver
199+
raise AMQPError(
200+
"Receiver is not initialized. No valid consumer options provided."
201+
)
176202

177203
@property
178204
def address(self) -> str:

rabbitmq_amqp_python_client/entities.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ def validate(self, versions: Dict[str, bool]) -> None:
160160
def filter_set(self) -> Dict[symbol, Described]:
161161
raise NotImplementedError("Subclasses should implement this method")
162162

163+
def direct_reply_to(self) -> bool:
164+
return False
165+
163166

164167
@dataclass
165168
class MessageProperties:
@@ -400,6 +403,21 @@ def validate(self, versions: Dict[str, bool]) -> None:
400403
)
401404

402405

406+
class DirectReplyToConsumerOptions(ConsumerOptions):
407+
408+
def validate(self, versions: Dict[str, bool]) -> None:
409+
if not versions.get("4.2.0", False):
410+
raise ValidationCodeException(
411+
"Direct Reply-To requires RabbitMQ 4.2.0 or higher"
412+
)
413+
414+
def filter_set(self) -> Dict[symbol, Described]:
415+
return {}
416+
417+
def direct_reply_to(self) -> bool:
418+
return True
419+
420+
403421
@dataclass
404422
class RecoveryConfiguration:
405423
"""

rabbitmq_amqp_python_client/options.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from .entities import ConsumerOptions
22
from .qpid.proton._data import ( # noqa: E402
3+
Data,
34
PropertyDict,
4-
symbol, Data,
5+
symbol,
6+
)
7+
from .qpid.proton._endpoints import ( # noqa: E402
8+
Link,
9+
Terminus,
510
)
6-
from .qpid.proton._endpoints import Link, Terminus # noqa: E402
711
from .qpid.proton.reactor import ( # noqa: E402
812
Filter,
913
LinkOption,
@@ -53,12 +57,10 @@ def apply(self, link: Link) -> None:
5357

5458

5559
class DynamicReceiverOption(LinkOption): # type: ignore
56-
def __init__(self):
57-
pass
5860

5961
def apply(self, link: Link) -> None:
6062
link.snd_settle_mode = Link.SND_SETTLED
61-
link.rcv_settle_mode = Link.RCV_FIRST
63+
# link.rcv_settle_mode = Link.RCV_FIRST
6264
link.source.expiry_policy = Terminus.EXPIRE_WITH_LINK
6365
link.properties = PropertyDict({symbol("paired"): True})
6466
link.source.dynamic = True

rabbitmq_amqp_python_client/publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def publish(self, message: Message) -> Delivery:
9494
return self._sender.send(message)
9595
else:
9696
if message.address != "":
97-
if validate_address(message.address) is False:
97+
if not validate_address(message.address):
9898
raise ArgumentOutOfRangeException(
9999
"destination address must start with /queues or /exchanges"
100100
)

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,9 +506,15 @@ def create_sender(
506506
),
507507
)
508508

509-
def create_dynamic_receiver(self, credit: Optional[int] = None):
509+
def create_dynamic_receiver(
510+
self, credit: Optional[int] = None, handler: Optional[Handler] = None
511+
):
510512
return self.create_receiver(
511-
credit=credit, dynamic=True, options=DynamicReceiverOption(), name="dynamic-receiver"
513+
credit=credit,
514+
dynamic=True,
515+
options=DynamicReceiverOption(),
516+
handler=handler,
517+
name="dynamic-receiver_" + str(id(self)),
512518
)
513519

514520
def create_receiver(

tests/direct_reply_to/__init__.py

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from rabbitmq_amqp_python_client import (
2+
Connection,
3+
DirectReplyToConsumerOptions,
4+
)
5+
6+
7+
def test_consumer_create_reply_name(connection: Connection) -> None:
8+
consumer = connection.consumer("", consumer_options=DirectReplyToConsumerOptions())
9+
assert "/queues/amq.rabbitmq.reply-to." in consumer.get_queue_address()

0 commit comments

Comments
 (0)