Skip to content

Commit 7222510

Browse files
author
DanielePalaia
committed
implementing queue_info
1 parent f8fd4c2 commit 7222510

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

rabbitmq_amqp_python_client/entities.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class QueueSpecification:
2525
dead_letter_exchange: str = ""
2626
is_auto_delete: bool = False
2727
is_durable: bool = True
28+
leader: str = ""
29+
members: str = ""
2830

2931

3032
@dataclass

rabbitmq_amqp_python_client/management.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
purge_queue_address,
1010
queue_address,
1111
)
12-
from .common import CommonValues
12+
from .common import CommonValues, QueueType
1313
from .entities import (
1414
BindingSpecification,
1515
ExchangeSpecification,
@@ -65,8 +65,10 @@ def request(
6565
path: str,
6666
method: str,
6767
expected_response_codes: list[int],
68-
) -> None:
69-
self._request(str(uuid.uuid4()), body, path, method, expected_response_codes)
68+
) -> Message:
69+
return self._request(
70+
str(uuid.uuid4()), body, path, method, expected_response_codes
71+
)
7072

7173
def _request(
7274
self,
@@ -75,7 +77,7 @@ def _request(
7577
path: str,
7678
method: str,
7779
expected_response_codes: list[int],
78-
) -> None:
80+
) -> Message:
7981
amq_message = Message(
8082
id=id,
8183
body=body,
@@ -93,6 +95,7 @@ def _request(
9395
logger.debug("Received message: " + str(msg))
9496

9597
self._validate_reponse_code(int(msg.subject), expected_response_codes)
98+
return msg
9699

97100
def declare_exchange(
98101
self, exchange_specification: ExchangeSpecification
@@ -236,3 +239,36 @@ def purge_queue(self, queue_name: str) -> None:
236239
CommonValues.response_code_200.value,
237240
],
238241
)
242+
243+
def queue_info(self, queue_name: str) -> QueueSpecification:
244+
logger.debug("queue_info operation called")
245+
path = queue_address(queue_name)
246+
247+
message = self.request(
248+
None,
249+
path,
250+
CommonValues.command_get.value,
251+
[
252+
CommonValues.response_code_200.value,
253+
],
254+
)
255+
256+
queue_info: dict[str, Any] = message.body
257+
258+
if queue_info["type"] == "quorum":
259+
queue_type = QueueType.quorum
260+
elif queue_info["type"] == "stream":
261+
queue_type = QueueType.stream
262+
else:
263+
queue_type = QueueType.classic
264+
265+
return QueueSpecification(
266+
name=queue_info["name"],
267+
is_durable=queue_info["durable"],
268+
is_auto_delete=queue_info["auto_delete"],
269+
is_exclusive=queue_info["exclusive"],
270+
queue_type=queue_type,
271+
leader=queue_info["leader"],
272+
members=queue_info["replicas"],
273+
arguments=queue_info["arguments"],
274+
)

tests/test_management.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,22 @@ def test_bind_exchange_to_queue() -> None:
8686
management.delete_queue(queue_name)
8787

8888
management.unbind(binding_exchange_queue_path)
89+
90+
91+
def test_queue_info() -> None:
92+
connection = Connection("amqp://guest:guest@localhost:5672/")
93+
connection.dial()
94+
95+
queue_name = "test-bind-exchange-to-queue-queue"
96+
management = connection.management()
97+
98+
queue_specification = QueueSpecification(
99+
name=queue_name, queue_type=QueueType.quorum, arguments={}
100+
)
101+
management.declare_queue(queue_specification)
102+
103+
queue_info = management.queue_info(queue_name=queue_name)
104+
105+
assert queue_info.name == queue_name
106+
assert queue_info.queue_type == queue_specification.queue_type
107+
assert queue_info.is_durable == queue_specification.is_durable

0 commit comments

Comments
 (0)