Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions examples/direct_reply_queue/direct_reply_to.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# type: ignore


from rabbitmq_amqp_python_client import (
AMQPMessagingHandler,
Connection,
Converter,
DirectReplyToConsumerOptions,
Environment,
Event,
Message,
OutcomeState,
)

MESSAGES_TO_PUBLISH = 200


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
print(
"received message: {} ".format(
Converter.bytes_to_string(event.message.body)
)
)

# accepting
self.delivery_context.accept(event)

self._count = self._count + 1
print("count " + str(self._count))

if self._count == MESSAGES_TO_PUBLISH:
print("received all messages")

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()
return connection


def main() -> None:
print("connection_consumer to amqp server")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
connection_consumer = create_connection(environment)
consumer = connection_consumer.consumer(
message_handler=MyMessageHandler(),
consumer_options=DirectReplyToConsumerOptions(),
)
addr = consumer.address
print("connecting to address: {}".format(addr))
connection_publisher = create_connection(environment)
publisher = connection_publisher.publisher(addr)

for i in range(MESSAGES_TO_PUBLISH):
msg = Message(body=Converter.string_to_bytes("test message {} ".format(i)))
status = publisher.publish(msg)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == OutcomeState.REJECTED:
print("message rejected")

try:
consumer.run()
except KeyboardInterrupt:
pass

consumer.close()
publisher.close()
connection_consumer.close()
connection_publisher.close()


if __name__ == "__main__":
main()
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ readme = "README.md"

[tool.poetry.dependencies]
python = "^3.9"
python-qpid-proton = "^0.39.0"
python-qpid-proton = "^0.40.0"
typing-extensions = "^4.13.0"
packaging = "^23.0"

Expand All @@ -21,7 +21,7 @@ isort = "^5.9.3"
mypy = "^0.910"
pytest = "^8.3.4"
black = "^24.3.0"
python-qpid-proton = "^0.39.0"
python-qpid-proton = "^0.40.0"
requests = "^2.31.0"
pytest-asyncio = "^1.2.0"

Expand Down
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .consumer import Consumer
from .entities import (
ConsumerOptions,
DirectReplyToConsumerOptions,
ExchangeCustomSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
Expand Down Expand Up @@ -72,6 +73,7 @@
"QuorumQueueSpecification",
"ClassicQueueSpecification",
"StreamSpecification",
"DirectReplyToConsumerOptions",
"ExchangeToQueueBindingSpecification",
"ExchangeToExchangeBindingSpecification",
"QueueType",
Expand Down
12 changes: 6 additions & 6 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def publisher(self, destination: str = "") -> Publisher:

def consumer(
self,
destination: str,
destination: Optional[str] = None,
message_handler: Optional[MessagingHandler] = None,
consumer_options: Optional[ConsumerOptions] = None,
credit: Optional[int] = None,
Expand All @@ -388,7 +388,7 @@ def consumer(
Create a new consumer instance.

Args:
destination: The address to consume from
destination: Optional The address to consume from
message_handler: Optional handler for processing messages
consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options.
credit: Optional credit value for flow control
Expand All @@ -397,9 +397,11 @@ def consumer(
Consumer: A new consumer instance

Raises:
ArgumentOutOfRangeException: If destination address format is invalid
ArgumentOutOfRangeException: If destination address format is invalid.
Only applies if not using Direct Reply-to.
The server will provide the queue name in that case.
"""
if not validate_address(destination):
if destination is not None and not validate_address(destination):
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
Expand Down Expand Up @@ -438,9 +440,7 @@ def _on_disconnection(self) -> None:
time.sleep(delay.total_seconds())

try:

self._open_connections(reconnect_handlers=True)

self._connections.append(self)

except ConnectionException as e:
Expand Down
85 changes: 59 additions & 26 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import Literal, Optional, Union, cast

from .amqp_consumer_handler import AMQPMessagingHandler
from .entities import ConsumerOptions
from .entities import (
ConsumerOptions,
DirectReplyToConsumerOptions,
StreamConsumerOptions,
)
from .options import (
ReceiverOptionUnsettled,
ReceiverOptionUnsettledWithFilters,
Expand All @@ -29,16 +33,16 @@ class Consumer:
_conn (BlockingConnection): The underlying connection to RabbitMQ
_addr (str): The address to consume from
_handler (Optional[MessagingHandler]): Optional message handling callback
_stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
_consumer_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
_credit (Optional[int]): Flow control credit value
"""

def __init__(
self,
conn: BlockingConnection,
addr: str,
addr: Optional[str] = None,
handler: Optional[AMQPMessagingHandler] = None,
stream_options: Optional[ConsumerOptions] = None,
consumer_options: Optional[ConsumerOptions] = None,
credit: Optional[int] = None,
):
"""
Expand All @@ -48,14 +52,14 @@ def __init__(
conn: The blocking connection to use for consuming
addr: The address to consume from
handler: Optional message handler for processing received messages
stream_options: Optional configuration for stream-based consumption
consumer_options: Optional configuration for stream-based consumption
credit: Optional credit value for flow control
"""
self._receiver: Optional[BlockingReceiver] = None
self._conn = conn
self._addr = addr
self._handler = handler
self._stream_options = stream_options
self._consumer_options = consumer_options
self._credit = credit
self._consumers: list[Consumer] = []
self._open()
Expand All @@ -66,21 +70,25 @@ def _open(self) -> None:
self._receiver = self._create_receiver(self._addr)

def _update_connection(self, conn: BlockingConnection) -> None:
addr = ""
if self._addr is not None:
addr = self._addr

self._conn = conn
if self._stream_options is None:
if self._consumer_options is None:
logger.debug("creating new receiver without stream")
self._receiver = self._conn.create_receiver(
self._addr,
options=ReceiverOptionUnsettled(self._addr),
addr,
options=ReceiverOptionUnsettled(addr),
handler=self._handler,
)
else:
logger.debug("creating new stream receiver")
self._stream_options.offset(self._handler.offset - 1) # type: ignore
self._consumer_options.offset(self._handler.offset - 1) # type: ignore
self._receiver = self._conn.create_receiver(
self._addr,
addr,
options=ReceiverOptionUnsettledWithFilters(
self._addr, self._stream_options
addr, self._consumer_options
),
handler=self._handler,
)
Expand Down Expand Up @@ -142,29 +150,54 @@ def stop(self) -> None:
self._receiver.container.stop_events()
self._receiver.container.stop()

def _create_receiver(self, addr: str) -> BlockingReceiver:
logger.debug("Creating the receiver")
if self._stream_options is None:
receiver = self._conn.create_receiver(
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
)
def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver:
credit = 10
if self._credit is not None:
credit = self._credit

if self._consumer_options is not None:
logger.debug(
"Creating the receiver, with options: %s",
type(self._consumer_options).__name__,
)
else:
receiver = self._conn.create_receiver(
logger.debug("Creating the receiver, without options")

if self._consumer_options is None:
return self._conn.create_receiver(
addr,
options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options),
options=ReceiverOptionUnsettled(addr),
handler=self._handler,
credit=credit,
)

if self._credit is not None:
receiver.credit = self._credit
if isinstance(self._consumer_options, DirectReplyToConsumerOptions):
logger.debug("Creating dynamic receiver for direct reply-to")
dynamic_receiver = self._conn.create_dynamic_receiver(
credit, handler=self._handler
)
dynamic_receiver.credit = credit
return dynamic_receiver

return receiver
if isinstance(self._consumer_options, StreamConsumerOptions):
return self._conn.create_receiver(
addr,
options=ReceiverOptionUnsettledWithFilters(
addr, self._consumer_options
),
handler=self._handler,
)

raise Exception(
"Receiver is not initialized. No valid consumer options provided."
)

@property
def address(self) -> str:
"""Get the current publisher address."""
return self._addr
def address(self) -> Optional[str]:
if self._receiver is not None:
return cast(Optional[str], self._receiver.link.remote_source.address)
else:
raise Exception("Receiver is not initialized")

@property
def handler(self) -> Optional[AMQPMessagingHandler]:
Expand Down
18 changes: 18 additions & 0 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def validate(self, versions: Dict[str, bool]) -> None:
def filter_set(self) -> Dict[symbol, Described]:
raise NotImplementedError("Subclasses should implement this method")

def direct_reply_to(self) -> bool:
return False


@dataclass
class MessageProperties:
Expand Down Expand Up @@ -400,6 +403,21 @@ def validate(self, versions: Dict[str, bool]) -> None:
)


class DirectReplyToConsumerOptions(ConsumerOptions):

def validate(self, versions: Dict[str, bool]) -> None:
if not versions.get("4.2.0", False):
raise ValidationCodeException(
"Direct Reply-To requires RabbitMQ 4.2.0 or higher"
)

def filter_set(self) -> Dict[symbol, Described]:
return {}

def direct_reply_to(self) -> bool:
return True


@dataclass
class RecoveryConfiguration:
"""
Expand Down
Loading