1
1
from rabbitmq_amqp_python_client import (
2
- BindingSpecification ,
3
2
Connection ,
4
- ExchangeSpecification ,
5
- ExchangeType ,
6
- Message ,
7
3
QuorumQueueSpecification ,
8
4
queue_address ,
9
5
)
17
13
MyMessageHandlerRequeue ,
18
14
MyMessageHandlerRequeueWithAnnotations ,
19
15
)
20
- from .utils import create_connection
16
+ from .utils import (
17
+ cleanup_dead_lettering ,
18
+ create_connection ,
19
+ publish_messages ,
20
+ setup_dead_lettering ,
21
+ )
21
22
22
23
23
24
def test_consumer_sync_queue_accept (connection : Connection ) -> None :
@@ -29,17 +30,12 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
29
30
management .declare_queue (QuorumQueueSpecification (name = queue_name ))
30
31
31
32
addr_queue = queue_address (queue_name )
32
-
33
- publisher = connection .publisher ("/queues/" + queue_name )
34
33
consumer = connection .consumer (addr_queue )
35
34
36
35
consumed = 0
37
36
38
37
# publish messages_to_send messages
39
- for i in range (messages_to_send ):
40
- publisher .publish (Message (body = "test" + str (i )))
41
-
42
- publisher .close ()
38
+ publish_messages (connection , messages_to_send , queue_name )
43
39
44
40
# consumer synchronously without handler
45
41
for i in range (messages_to_send ):
@@ -49,30 +45,25 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
49
45
50
46
consumer .close ()
51
47
52
- assert consumed > 0
53
-
54
48
management .delete_queue (queue_name )
55
49
management .close ()
56
50
51
+ assert consumed > 0
52
+
57
53
58
54
def test_consumer_async_queue_accept (connection : Connection ) -> None :
59
55
60
56
messages_to_send = 1000
61
57
62
- queue_name = "test-queue_async_accept "
58
+ queue_name = "test-queue-async-accept "
63
59
64
60
management = connection .management ()
65
61
66
62
management .declare_queue (QuorumQueueSpecification (name = queue_name ))
67
63
68
64
addr_queue = queue_address (queue_name )
69
65
70
- publisher = connection .publisher ("/queues/" + queue_name )
71
-
72
- # publish messages_to_send messages
73
- for i in range (messages_to_send ):
74
- publisher .publish (Message (body = "test" + str (i )))
75
- publisher .close ()
66
+ publish_messages (connection , messages_to_send , queue_name )
76
67
77
68
# workaround: it looks like when the consumer finish to consume invalidate the connection
78
69
# so for the moment we need to use one dedicated
@@ -91,6 +82,8 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
91
82
92
83
message_count = management .purge_queue (queue_name )
93
84
85
+ management .delete_queue (queue_name )
86
+
94
87
management .close ()
95
88
96
89
assert message_count == 0
@@ -100,20 +93,15 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
100
93
101
94
messages_to_send = 1000
102
95
103
- queue_name = "test-queue_async_no_ack "
96
+ queue_name = "test-queue-async-no-ack "
104
97
105
98
management = connection .management ()
106
99
107
100
management .declare_queue (QuorumQueueSpecification (name = queue_name ))
108
101
109
102
addr_queue = queue_address (queue_name )
110
103
111
- publisher = connection .publisher ("/queues/" + queue_name )
112
-
113
- # publish messages_to_send messages
114
- for i in range (messages_to_send ):
115
- publisher .publish (Message (body = "test" + str (i )))
116
- publisher .close ()
104
+ publish_messages (connection , messages_to_send , queue_name )
117
105
118
106
# workaround: it looks like when the consumer finish to consume invalidate the connection
119
107
# so for the moment we need to use one dedicated
@@ -141,29 +129,16 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
141
129
def test_consumer_async_queue_with_discard (connection : Connection ) -> None :
142
130
messages_to_send = 1000
143
131
144
- exchange_dead_lettering = "exchange-dead-letter"
145
132
queue_dead_lettering = "queue-dead-letter"
146
- queue_name = "test-queue_async_discard"
147
- binding_key = "key_dead_letter"
133
+ queue_name = "test-queue-async-discard"
134
+ exchange_dead_lettering = "exchange-dead-letter"
135
+ binding_key = "key-dead-letter"
148
136
149
137
management = connection .management ()
150
138
151
139
# configuring dead lettering
152
- management .declare_exchange (
153
- ExchangeSpecification (
154
- name = exchange_dead_lettering ,
155
- exchange_type = ExchangeType .fanout ,
156
- arguments = {},
157
- )
158
- )
159
- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
160
- bind_path = management .bind (
161
- BindingSpecification (
162
- source_exchange = exchange_dead_lettering ,
163
- destination_queue = queue_dead_lettering ,
164
- binding_key = binding_key ,
165
- )
166
- )
140
+ bind_path = setup_dead_lettering (management )
141
+ addr_queue = queue_address (queue_name )
167
142
168
143
management .declare_queue (
169
144
QuorumQueueSpecification (
@@ -173,14 +148,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
173
148
)
174
149
)
175
150
176
- addr_queue = queue_address (queue_name )
177
-
178
- publisher = connection .publisher ("/queues/" + queue_name )
179
-
180
- # publish messages_to_send messages
181
- for i in range (messages_to_send ):
182
- publisher .publish (Message (body = "test" + str (i )))
183
- publisher .close ()
151
+ publish_messages (connection , messages_to_send , queue_name )
184
152
185
153
# workaround: it looks like when the consumer finish to consume invalidate the connection
186
154
# so for the moment we need to use one dedicated
@@ -204,9 +172,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
204
172
205
173
message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
206
174
207
- management .unbind (bind_path )
208
- management .delete_exchange (exchange_dead_lettering )
209
- management .delete_queue (queue_dead_lettering )
175
+ cleanup_dead_lettering (management , bind_path )
210
176
211
177
management .close ()
212
178
@@ -220,30 +186,13 @@ def test_consumer_async_queue_with_discard_with_annotations(
220
186
) -> None :
221
187
messages_to_send = 1000
222
188
223
- exchange_dead_lettering = "exchange-dead-letter"
224
189
queue_dead_lettering = "queue-dead-letter"
225
- queue_name = "test-queue_async_discard"
226
- binding_key = "key_dead_letter"
190
+ queue_name = "test-queue-async-discard"
191
+ exchange_dead_lettering = "exchange-dead-letter"
192
+ binding_key = "key-dead-letter"
227
193
228
194
management = connection .management ()
229
195
230
- # configuring dead lettering
231
- management .declare_exchange (
232
- ExchangeSpecification (
233
- name = exchange_dead_lettering ,
234
- exchange_type = ExchangeType .fanout ,
235
- arguments = {},
236
- )
237
- )
238
- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
239
- bind_path = management .bind (
240
- BindingSpecification (
241
- source_exchange = exchange_dead_lettering ,
242
- destination_queue = queue_dead_lettering ,
243
- binding_key = binding_key ,
244
- )
245
- )
246
-
247
196
management .declare_queue (
248
197
QuorumQueueSpecification (
249
198
name = queue_name ,
@@ -252,16 +201,12 @@ def test_consumer_async_queue_with_discard_with_annotations(
252
201
)
253
202
)
254
203
204
+ publish_messages (connection , messages_to_send , queue_name )
205
+
206
+ bind_path = setup_dead_lettering (management )
255
207
addr_queue = queue_address (queue_name )
256
208
addr_queue_dl = queue_address (queue_dead_lettering )
257
209
258
- publisher = connection .publisher ("/queues/" + queue_name )
259
-
260
- # publish messages_to_send messages
261
- for i in range (messages_to_send ):
262
- publisher .publish (Message (body = "test" + str (i )))
263
- publisher .close ()
264
-
265
210
# workaround: it looks like when the consumer finish to consume invalidate the connection
266
211
# so for the moment we need to use one dedicated
267
212
connection_consumer = create_connection ()
@@ -283,20 +228,18 @@ def test_consumer_async_queue_with_discard_with_annotations(
283
228
message = new_consumer .consume ()
284
229
new_consumer .close ()
285
230
286
- assert "x-opt-string" in message .annotations
287
-
288
231
message_count = management .purge_queue (queue_name )
289
232
290
233
management .delete_queue (queue_name )
291
234
292
235
message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
293
236
294
- management .unbind (bind_path )
295
- management .delete_exchange (exchange_dead_lettering )
296
- management .delete_queue (queue_dead_lettering )
237
+ cleanup_dead_lettering (management , bind_path )
297
238
298
239
management .close ()
299
240
241
+ assert "x-opt-string" in message .annotations
242
+
300
243
assert message_count == 0
301
244
# check dead letter queue
302
245
assert message_count_dead_lettering == messages_to_send
@@ -305,20 +248,15 @@ def test_consumer_async_queue_with_discard_with_annotations(
305
248
def test_consumer_async_queue_with_requeue (connection : Connection ) -> None :
306
249
messages_to_send = 1000
307
250
308
- queue_name = "test-queue_async_requeue "
251
+ queue_name = "test-queue-async-requeue "
309
252
310
253
management = connection .management ()
311
254
312
255
management .declare_queue (QuorumQueueSpecification (name = queue_name ))
313
256
314
257
addr_queue = queue_address (queue_name )
315
258
316
- publisher = connection .publisher ("/queues/" + queue_name )
317
-
318
- # publish messages_to_send messages
319
- for i in range (messages_to_send ):
320
- publisher .publish (Message (body = "test" + str (i )))
321
- publisher .close ()
259
+ publish_messages (connection , messages_to_send , queue_name )
322
260
323
261
# workaround: it looks like when the consumer finish to consume invalidate the connection
324
262
# so for the moment we need to use one dedicated
@@ -349,20 +287,15 @@ def test_consumer_async_queue_with_requeue_with_annotations(
349
287
) -> None :
350
288
messages_to_send = 1000
351
289
352
- queue_name = "test-queue_async_requeue "
290
+ queue_name = "test-queue-async-requeue "
353
291
354
292
management = connection .management ()
355
293
356
294
management .declare_queue (QuorumQueueSpecification (name = queue_name ))
357
295
358
296
addr_queue = queue_address (queue_name )
359
297
360
- publisher = connection .publisher ("/queues/" + queue_name )
361
-
362
- # publish messages_to_send messages
363
- for i in range (messages_to_send ):
364
- publisher .publish (Message (body = "test" + str (i )))
365
- publisher .close ()
298
+ publish_messages (connection , messages_to_send , queue_name )
366
299
367
300
# workaround: it looks like when the consumer finish to consume invalidate the connection
368
301
# so for the moment we need to use one dedicated
@@ -385,11 +318,11 @@ def test_consumer_async_queue_with_requeue_with_annotations(
385
318
message = new_consumer .consume ()
386
319
new_consumer .close ()
387
320
388
- assert "x-opt-string" in message .annotations
389
-
390
321
message_count = management .purge_queue (queue_name )
391
322
392
323
management .delete_queue (queue_name )
393
324
management .close ()
394
325
326
+ assert "x-opt-string" in message .annotations
327
+
395
328
assert message_count > 0
0 commit comments