Skip to content

Commit 08adb16

Browse files
author
DanielePalaia
committed
adding purge test
1 parent 43efb26 commit 08adb16

File tree

3 files changed

+59
-4
lines changed

3 files changed

+59
-4
lines changed

rabbitmq_amqp_python_client/management.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def _request(
105105
def declare_exchange(
106106
self, exchange_specification: ExchangeSpecification
107107
) -> ExchangeSpecification:
108-
logger.debug("delete_exchange operation called")
108+
logger.debug("declare_exchange operation called")
109109
body = {}
110110
body["auto_delete"] = exchange_specification.is_auto_delete
111111
body["durable"] = exchange_specification.is_durable
@@ -326,11 +326,13 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
326326
],
327327
)
328328

329-
def purge_queue(self, queue_name: str) -> None:
329+
def purge_queue(self, queue_name: str) -> int:
330330
logger.debug("purge_queue operation called")
331331
path = purge_queue_address(queue_name)
332332

333-
self.request(
333+
print("path: " + path)
334+
335+
response = self.request(
334336
None,
335337
path,
336338
CommonValues.command_delete.value,
@@ -339,6 +341,8 @@ def purge_queue(self, queue_name: str) -> None:
339341
],
340342
)
341343

344+
return int(response.body["message_count"])
345+
342346
def queue_info(self, queue_name: str) -> QueueInfo:
343347
logger.debug("queue_info operation called")
344348
path = queue_address(queue_name)

tests/test_management.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,30 @@ def test_declare_classic_queue_with_args() -> None:
199199
management.delete_queue(queue_name)
200200

201201

202+
def test_declare_classic_queue_with_invalid_args() -> None:
203+
connection = Connection("amqp://guest:guest@localhost:5672/")
204+
connection.dial()
205+
206+
queue_name = "test-queue_with_args"
207+
management = connection.management()
208+
test_failure = True
209+
210+
queue_specification = ClassicQueueSpecification(
211+
name=queue_name,
212+
queue_type=QueueType.classic,
213+
max_len=-5,
214+
)
215+
216+
try:
217+
management.declare_queue(queue_specification)
218+
except ValidationCodeException:
219+
test_failure = False
220+
221+
management.delete_queue(queue_name)
222+
223+
assert test_failure is False
224+
225+
202226
def test_declare_stream_with_args() -> None:
203227
connection = Connection("amqp://guest:guest@localhost:5672/")
204228
connection.dial()

tests/test_publisher.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
)
66

77

8-
def test_bind_exchange_to_queue() -> None:
8+
def test_publish_exchange() -> None:
99
connection = Connection("amqp://guest:guest@localhost:5672/")
1010
connection.dial()
1111

@@ -27,3 +27,30 @@ def test_bind_exchange_to_queue() -> None:
2727
publisher.close()
2828

2929
management.delete_queue(queue_name)
30+
31+
32+
def test_publish_purge() -> None:
33+
connection = Connection("amqp://guest:guest@localhost:5672/")
34+
connection.dial()
35+
36+
queue_name = "test-queue"
37+
management = connection.management()
38+
39+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
40+
41+
raised = False
42+
43+
try:
44+
publisher = connection.publisher("/queues/" + queue_name)
45+
publisher.publish(Message(body="test"))
46+
except Exception:
47+
raised = True
48+
49+
message_purged = management.purge_queue(queue_name)
50+
51+
assert raised is False
52+
assert message_purged == 1
53+
54+
publisher.close()
55+
56+
management.delete_queue(queue_name)

0 commit comments

Comments
 (0)