Skip to content

Commit 4757a78

Browse files
committed
Address optional
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 2237f51 commit 4757a78

File tree

4 files changed

+28
-34
lines changed

4 files changed

+28
-34
lines changed

rabbitmq_amqp_python_client/connection.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from .consumer import Consumer
1818
from .entities import (
1919
ConsumerOptions,
20-
DirectReplyToConsumerOptions,
2120
OAuth2Options,
2221
RecoveryConfiguration,
2322
)
@@ -380,7 +379,7 @@ def publisher(self, destination: str = "") -> Publisher:
380379

381380
def consumer(
382381
self,
383-
destination: str,
382+
destination: Optional[str] = None,
384383
message_handler: Optional[MessagingHandler] = None,
385384
consumer_options: Optional[ConsumerOptions] = None,
386385
credit: Optional[int] = None,
@@ -389,7 +388,7 @@ def consumer(
389388
Create a new consumer instance.
390389
391390
Args:
392-
destination: The address to consume from
391+
destination: Optional The address to consume from
393392
message_handler: Optional handler for processing messages
394393
consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options.
395394
credit: Optional credit value for flow control
@@ -402,9 +401,7 @@ def consumer(
402401
Only applies if not using Direct Reply-to.
403402
The server will provide the queue name in that case.
404403
"""
405-
if not validate_address(destination) and not isinstance(
406-
consumer_options, DirectReplyToConsumerOptions
407-
):
404+
if destination is not None and not validate_address(destination):
408405
raise ArgumentOutOfRangeException(
409406
"destination address must start with /queues or /exchanges"
410407
)

rabbitmq_amqp_python_client/consumer.py

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
from typing import Literal, Optional, Union, cast
33

4-
54
from .amqp_consumer_handler import AMQPMessagingHandler
65
from .entities import (
76
ConsumerOptions,
@@ -41,7 +40,7 @@ class Consumer:
4140
def __init__(
4241
self,
4342
conn: BlockingConnection,
44-
addr: str,
43+
addr: Optional[str] = None,
4544
handler: Optional[AMQPMessagingHandler] = None,
4645
consumer_options: Optional[ConsumerOptions] = None,
4746
credit: Optional[int] = None,
@@ -70,34 +69,26 @@ def _open(self) -> None:
7069
logger.debug("Creating Receiver")
7170
self._receiver = self._create_receiver(self._addr)
7271

73-
def get_queue_address(self) -> Optional[str]:
74-
"""
75-
Get the name of the queue from the address.
76-
77-
Returns:
78-
str: The name of the queue.
79-
"""
80-
if self._receiver is not None:
81-
return cast(Optional[str], self._receiver.link.remote_source.address)
82-
else:
83-
raise Exception("Receiver is not initialized")
84-
8572
def _update_connection(self, conn: BlockingConnection) -> None:
73+
addr = ""
74+
if self._addr is not None:
75+
addr = self._addr
76+
8677
self._conn = conn
8778
if self._consumer_options is None:
8879
logger.debug("creating new receiver without stream")
8980
self._receiver = self._conn.create_receiver(
90-
self._addr,
91-
options=ReceiverOptionUnsettled(self._addr),
81+
addr,
82+
options=ReceiverOptionUnsettled(addr),
9283
handler=self._handler,
9384
)
9485
else:
9586
logger.debug("creating new stream receiver")
9687
self._consumer_options.offset(self._handler.offset - 1) # type: ignore
9788
self._receiver = self._conn.create_receiver(
98-
self._addr,
89+
addr,
9990
options=ReceiverOptionUnsettledWithFilters(
100-
self._addr, self._consumer_options
91+
addr, self._consumer_options
10192
),
10293
handler=self._handler,
10394
)
@@ -159,7 +150,7 @@ def stop(self) -> None:
159150
self._receiver.container.stop_events()
160151
self._receiver.container.stop()
161152

162-
def _create_receiver(self, addr: str) -> BlockingReceiver:
153+
def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver:
163154
credit = 100
164155
if self._credit is not None:
165156
credit = self._credit
@@ -182,7 +173,9 @@ def _create_receiver(self, addr: str) -> BlockingReceiver:
182173

183174
if isinstance(self._consumer_options, DirectReplyToConsumerOptions):
184175
logger.debug("Creating dynamic receiver for direct reply-to")
185-
dynamic_receiver = self._conn.create_dynamic_receiver(100, handler=self._handler)
176+
dynamic_receiver = self._conn.create_dynamic_receiver(
177+
100, handler=self._handler
178+
)
186179
dynamic_receiver.credit = credit
187180
return dynamic_receiver
188181

@@ -195,14 +188,16 @@ def _create_receiver(self, addr: str) -> BlockingReceiver:
195188
handler=self._handler,
196189
)
197190

198-
raise AMQPError(
191+
raise Exception(
199192
"Receiver is not initialized. No valid consumer options provided."
200193
)
201194

202195
@property
203-
def address(self) -> str:
204-
"""Get the current publisher address."""
205-
return self._addr
196+
def address(self) -> Optional[str]:
197+
if self._receiver is not None:
198+
return cast(Optional[str], self._receiver.link.remote_source.address)
199+
else:
200+
raise Exception("Receiver is not initialized")
206201

207202
@property
208203
def handler(self) -> Optional[AMQPMessagingHandler]:

rabbitmq_amqp_python_client/options.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Optional
2+
13
from .entities import ConsumerOptions
24
from .qpid.proton._data import ( # noqa: E402
35
Data,
@@ -72,7 +74,7 @@ def apply(self, link: Link) -> None:
7274

7375

7476
class ReceiverOptionUnsettled(LinkOption): # type: ignore
75-
def __init__(self, addr: str):
77+
def __init__(self, addr: Optional[str]):
7678
self._addr = addr
7779

7880
def apply(self, link: Link) -> None:
@@ -87,7 +89,7 @@ def test(self, link: Link) -> bool:
8789

8890

8991
class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
90-
def __init__(self, addr: str, consumer_options: ConsumerOptions):
92+
def __init__(self, addr: Optional[str], consumer_options: ConsumerOptions):
9193
super().__init__(consumer_options.filter_set())
9294
self._addr = addr
9395

tests/direct_reply_to/test_direct_reply.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66

77
def test_consumer_create_reply_name(connection: Connection) -> None:
8-
consumer = connection.consumer("", consumer_options=DirectReplyToConsumerOptions())
9-
assert "/queues/amq.rabbitmq.reply-to." in consumer.get_queue_address()
8+
consumer = connection.consumer(consumer_options=DirectReplyToConsumerOptions())
9+
assert "/queues/amq.rabbitmq.reply-to." in consumer.address

0 commit comments

Comments
 (0)