Skip to content

Commit 37570f2

Browse files
author
DanielePalaia
committed
fixing queue arguments management
1 parent a031ce4 commit 37570f2

File tree

6 files changed

+139
-20
lines changed

6 files changed

+139
-20
lines changed

examples/getting_started/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def main() -> None:
2424
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
2525

2626
management.declare_queue(
27-
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
27+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
2828
)
2929

3030
print("binding queue to exchange")

rabbitmq_amqp_python_client/entities.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import Optional
2+
from typing import Any, Optional
33

44
from .common import ExchangeType, QueueType
55

@@ -17,20 +17,24 @@ class ExchangeSpecification:
1717
@dataclass
1818
class QueueSpecification:
1919
name: str
20-
arguments: dict[str, str]
2120
queue_type: QueueType = QueueType.quorum
22-
dead_letter_routing_key: str = ""
21+
dead_letter_routing_key: Optional[str] = None
2322
is_exclusive: Optional[bool] = None
23+
max_len: Optional[int] = None
2424
max_len_bytes: Optional[int] = None
25-
dead_letter_exchange: str = ""
25+
message_ttl: Optional[int] = None
26+
expires: Optional[int] = None
27+
dead_letter_exchange: Optional[str] = ""
2628
is_auto_delete: bool = False
2729
is_durable: bool = True
30+
overflow: Optional[str] = None
31+
single_active_consumer: Optional[bool] = None
2832

2933

3034
@dataclass
3135
class QueueInfo:
3236
name: str
33-
arguments: dict[str, str]
37+
arguments: dict[str, Any]
3438
queue_type: QueueType = QueueType.quorum
3539
is_exclusive: Optional[bool] = None
3640
is_auto_delete: bool = False

rabbitmq_amqp_python_client/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
class ValidationCodeException(Exception):
1+
class ValidationCodeException(BaseException):
22
# Constructor or Initializer
33
def __init__(self, msg: str):
44
self.msg = msg

rabbitmq_amqp_python_client/management.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def declare_exchange(
107107
body["durable"] = exchange_specification.is_durable
108108
body["type"] = exchange_specification.exchange_type.value # type: ignore
109109
body["internal"] = exchange_specification.is_internal
110-
body["arguments"] = {} # type: ignore
110+
body["arguments"] = exchange_specification.arguments # type: ignore
111111

112112
path = exchange_address(exchange_specification.name)
113113

@@ -129,14 +129,33 @@ def declare_queue(
129129
) -> QueueSpecification:
130130
logger.debug("declare_queue operation called")
131131
body = {}
132+
args: dict[str, Any] = {}
133+
132134
body["auto_delete"] = queue_specification.is_auto_delete
133135
body["durable"] = queue_specification.is_durable
134-
body["arguments"] = { # type: ignore
135-
"x-queue-type": queue_specification.queue_type.value,
136-
"x-dead-letter-exchange": queue_specification.dead_letter_exchange,
137-
"x-dead-letter-routing-key": queue_specification.dead_letter_routing_key,
138-
"max-length-bytes": queue_specification.max_len_bytes,
139-
}
136+
args["x-queue-type"] = queue_specification.queue_type.value
137+
if queue_specification.dead_letter_exchange is not None:
138+
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
139+
if queue_specification.dead_letter_routing_key is not None:
140+
args["x-dead-letter-routing-key"] = (
141+
queue_specification.dead_letter_routing_key
142+
)
143+
if queue_specification.overflow is not None:
144+
args["x-overflow"] = queue_specification.overflow
145+
if queue_specification.max_len is not None:
146+
args["x-max-length"] = queue_specification.max_len
147+
if queue_specification.max_len_bytes is not None:
148+
args["x-max-length-bytes"] = queue_specification.max_len_bytes
149+
if queue_specification.message_ttl is not None:
150+
args["x-message-ttl"] = queue_specification.message_ttl
151+
if queue_specification.expires is not None:
152+
args["x-expires"] = queue_specification.expires
153+
if queue_specification.single_active_consumer is not None:
154+
args["x-single-active-consumer"] = (
155+
queue_specification.single_active_consumer
156+
)
157+
158+
body["arguments"] = args # type: ignore
140159

141160
path = queue_address(queue_specification.name)
142161

tests/test_management.py

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
QueueSpecification,
66
QueueType,
77
)
8+
from rabbitmq_amqp_python_client.exceptions import (
9+
ValidationCodeException,
10+
)
811

912

1013
def test_declare_delete_exchange() -> None:
@@ -33,7 +36,7 @@ def test_declare_purge_delete_queue() -> None:
3336
management = connection.management()
3437

3538
queue_info = management.declare_queue(
36-
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
39+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
3740
)
3841

3942
assert queue_info.name == queue_name
@@ -57,7 +60,7 @@ def test_bind_exchange_to_queue() -> None:
5760
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
5861

5962
management.declare_queue(
60-
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
63+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
6164
)
6265

6366
binding_exchange_queue_path = management.bind(
@@ -88,20 +91,113 @@ def test_bind_exchange_to_queue() -> None:
8891
management.unbind(binding_exchange_queue_path)
8992

9093

91-
def test_queue_info() -> None:
94+
def test_queue_info_with_validations() -> None:
9295
connection = Connection("amqp://guest:guest@localhost:5672/")
9396
connection.dial()
9497

95-
queue_name = "test-bind-exchange-to-queue-queue"
98+
queue_name = "test_queue_info_with_validation"
9699
management = connection.management()
97100

98101
queue_specification = QueueSpecification(
99-
name=queue_name, queue_type=QueueType.quorum, arguments={}
102+
name=queue_name,
103+
queue_type=QueueType.quorum,
100104
)
101105
management.declare_queue(queue_specification)
102106

103107
queue_info = management.queue_info(queue_name=queue_name)
104108

109+
management.delete_queue(queue_name)
110+
105111
assert queue_info.name == queue_name
106112
assert queue_info.queue_type == queue_specification.queue_type
107113
assert queue_info.is_durable == queue_specification.is_durable
114+
assert queue_info.message_count == 0
115+
116+
117+
def test_queue_precondition_fail() -> None:
118+
connection = Connection("amqp://guest:guest@localhost:5672/")
119+
connection.dial()
120+
test_failure = True
121+
122+
queue_name = "test-queue_precondition_fail"
123+
management = connection.management()
124+
125+
queue_specification = QueueSpecification(
126+
name=queue_name, queue_type=QueueType.quorum, is_auto_delete=False
127+
)
128+
management.declare_queue(queue_specification)
129+
130+
management.declare_queue(queue_specification)
131+
132+
queue_specification = QueueSpecification(
133+
name=queue_name,
134+
queue_type=QueueType.quorum,
135+
is_auto_delete=True,
136+
)
137+
138+
management.delete_queue(queue_name)
139+
140+
try:
141+
management.declare_queue(queue_specification)
142+
except ValidationCodeException:
143+
test_failure = False
144+
145+
assert test_failure is False
146+
147+
148+
def test_declare_classic_queue() -> None:
149+
connection = Connection("amqp://guest:guest@localhost:5672/")
150+
connection.dial()
151+
152+
queue_name = "test-declare_classic_queue"
153+
management = connection.management()
154+
155+
queue_specification = QueueSpecification(
156+
name=queue_name,
157+
queue_type=QueueType.classic,
158+
is_auto_delete=False,
159+
)
160+
queue_info = management.declare_queue(queue_specification)
161+
162+
assert queue_info.name == queue_specification.name
163+
assert queue_info.queue_type == queue_specification.queue_type
164+
165+
management.delete_queue(queue_name)
166+
167+
168+
def test_declare_queue_with_args() -> None:
169+
connection = Connection("amqp://guest:guest@localhost:5672/")
170+
connection.dial()
171+
172+
queue_name = "test-queue_with_args"
173+
management = connection.management()
174+
175+
queue_specification = QueueSpecification(
176+
name=queue_name,
177+
queue_type=QueueType.classic,
178+
is_auto_delete=False,
179+
dead_letter_exchange="my_exchange",
180+
dead_letter_routing_key="my_key",
181+
max_len=50000000,
182+
max_len_bytes=1000000000,
183+
expires=2000,
184+
single_active_consumer=True,
185+
)
186+
187+
queue_info = management.declare_queue(queue_specification)
188+
189+
assert queue_specification.name == queue_info.name
190+
assert queue_specification.is_auto_delete == queue_info.is_auto_delete
191+
assert queue_specification.dead_letter_exchange == queue_info.dead_letter_exchange
192+
assert (
193+
queue_specification.dead_letter_routing_key
194+
== queue_info.dead_letter_routing_key
195+
)
196+
assert queue_specification.max_len == queue_info.max_len
197+
assert queue_specification.max_len_bytes == queue_info.max_len_bytes
198+
assert queue_specification.expires == queue_info.expires
199+
assert (
200+
queue_specification.single_active_consumer == queue_info.single_active_consumer
201+
)
202+
203+
management.delete_queue(queue_name)

tests/test_publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def test_bind_exchange_to_queue() -> None:
1414
management = connection.management()
1515

1616
management.declare_queue(
17-
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
17+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
1818
)
1919

2020
raised = False

0 commit comments

Comments
 (0)