Skip to content

Commit b01834d

Browse files
author
DanielePalaia
committed
implementing purge operation
1 parent 078ca17 commit b01834d

File tree

6 files changed

+35
-13
lines changed

6 files changed

+35
-13
lines changed

examples/getting_started/main.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@ def main() -> None:
1515
routing_key = "routing-key"
1616
connection = Connection("amqp://guest:guest@localhost:5672/")
1717

18+
print("connection to amqp server")
1819
connection.dial()
1920

2021
management = connection.management()
2122

23+
print("declaring exchange and queue")
2224
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
2325

2426
management.declare_queue(
2527
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
2628
)
2729

30+
print("binding queue to exchange")
2831
bind_name = management.bind(
2932
BindingSpecification(
3033
source_exchange=exchange_name,
@@ -35,22 +38,27 @@ def main() -> None:
3538

3639
addr = exchange_address(exchange_name, routing_key)
3740

41+
print("create a publisher and publish a test message")
3842
publisher = connection.publisher(addr)
3943

4044
publisher.publish(Message(body="test"))
4145

4246
publisher.close()
4347

48+
print("unbind")
4449
management.unbind(bind_name)
4550

46-
# management.purge_queue(queue_info.name)
51+
print("purging queue")
52+
management.purge_queue(queue_name)
4753

54+
print("delete queue")
4855
management.delete_queue(queue_name)
4956

57+
print("delete exchange")
5058
management.delete_exchange(exchange_name)
5159

60+
print("closing connections")
5261
management.close()
53-
5462
connection.close()
5563

5664

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ def queue_address(name: str) -> str:
1616
return path
1717

1818

19+
def purge_queue_address(name: str) -> str:
20+
path = "/queues/" + name + "/messages"
21+
22+
return path
23+
24+
1925
def path_address() -> str:
2026
path = "/bindings"
2127

rabbitmq_amqp_python_client/management.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
binding_path_with_exchange_queue,
77
exchange_address,
88
path_address,
9+
purge_queue_address,
910
queue_address,
1011
)
1112
from .common import CommonValues
@@ -157,7 +158,6 @@ def delete_exchange(self, exchange_name: str) -> None:
157158
path,
158159
CommonValues.command_delete.value,
159160
[
160-
CommonValues.response_code_200.value,
161161
CommonValues.response_code_204.value,
162162
],
163163
)
@@ -172,7 +172,6 @@ def delete_queue(self, queue_name: str) -> None:
172172
CommonValues.command_delete.value,
173173
[
174174
CommonValues.response_code_200.value,
175-
CommonValues.response_code_204.value,
176175
],
177176
)
178177

@@ -223,7 +222,6 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
223222
binding_exchange_queue_path,
224223
CommonValues.command_delete.value,
225224
[
226-
CommonValues.response_code_200.value,
227225
CommonValues.response_code_204.value,
228226
],
229227
)
@@ -232,4 +230,17 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
232230
# def queue_info(self, queue_name:str):
233231

234232
# TODO
235-
# def purge_queue(self, queue_name:str):
233+
def purge_queue(self, queue_name: str):
234+
logger.debug("purge_queue operation called")
235+
path = purge_queue_address(queue_name)
236+
237+
print("path: " + path)
238+
239+
self.request(
240+
None,
241+
path,
242+
CommonValues.command_delete.value,
243+
[
244+
CommonValues.response_code_200.value,
245+
],
246+
)

rabbitmq_amqp_python_client/qpid/proton/_endpoints.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1317,7 +1317,6 @@ def send(
13171317
Where the object is a :class:`Message`, this will send the message over
13181318
this link, creating a new delivery for the purpose.
13191319
"""
1320-
print("XXXXXXXXXXXX")
13211320
if hasattr(obj, "send"):
13221321
return obj.send(self, tag=tag)
13231322
else:

tests/test_management.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def test_declare_delete_exchange() -> None:
2626
connection.close()
2727

2828

29-
def test_declare_delete_queue() -> None:
29+
def test_declare_purge_delete_queue() -> None:
3030
connection = Connection("amqp://guest:guest@localhost:5672/")
3131
connection.dial()
3232

@@ -39,7 +39,8 @@ def test_declare_delete_queue() -> None:
3939

4040
assert queue_info.name == queue_name
4141

42-
# Still not working
42+
management.purge_queue(queue_name)
43+
4344
management.delete_queue(queue_name)
4445

4546
connection.close()
@@ -81,11 +82,8 @@ def test_bind_exchange_to_queue() -> None:
8182
+ ";args="
8283
)
8384

84-
# Still not working
8585
management.delete_exchange(exchange_name)
8686

87-
# Still not working
8887
management.delete_queue(queue_name)
8988

90-
# Still not working
9189
management.unbind(binding_exchange_queue_path)

tests/test_publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ def test_bind_exchange_to_queue() -> None:
2828
assert raised is False
2929

3030
publisher.close()
31-
# Still not working
31+
3232
management.delete_queue(queue_name)

0 commit comments

Comments
 (0)