1
1
from rabbitmq_amqp_python_client import (
2
2
BindingSpecification ,
3
3
ClassicQueueSpecification ,
4
- Connection ,
5
4
ExchangeSpecification ,
5
+ Management ,
6
6
QueueType ,
7
7
QuorumQueueSpecification ,
8
8
StreamSpecification ,
12
12
)
13
13
14
14
15
- def test_declare_delete_exchange () -> None :
16
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
17
- connection .dial ()
15
+ def test_declare_delete_exchange (management : Management ) -> None :
18
16
19
17
exchange_name = "test-exchange"
20
- management = connection .management ()
21
18
22
19
exchange_info = management .declare_exchange (
23
20
ExchangeSpecification (name = exchange_name , arguments = {})
@@ -27,15 +24,9 @@ def test_declare_delete_exchange() -> None:
27
24
28
25
management .delete_exchange (exchange_name )
29
26
30
- connection .close ()
31
-
32
-
33
- def test_declare_purge_delete_queue () -> None :
34
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
35
- connection .dial ()
36
27
28
+ def test_declare_purge_delete_queue (management : Management ) -> None :
37
29
queue_name = "my_queue"
38
- management = connection .management ()
39
30
40
31
queue_info = management .declare_queue (QuorumQueueSpecification (name = queue_name ))
41
32
@@ -45,17 +36,12 @@ def test_declare_purge_delete_queue() -> None:
45
36
46
37
management .delete_queue (queue_name )
47
38
48
- connection .close ()
49
-
50
39
51
- def test_bind_exchange_to_queue () -> None :
52
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
53
- connection .dial ()
40
+ def test_bind_exchange_to_queue (management : Management ) -> None :
54
41
55
42
exchange_name = "test-bind-exchange-to-queue-exchange"
56
43
queue_name = "test-bind-exchange-to-queue-queue"
57
44
routing_key = "routing-key"
58
- management = connection .management ()
59
45
60
46
management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
61
47
@@ -89,12 +75,9 @@ def test_bind_exchange_to_queue() -> None:
89
75
management .unbind (binding_exchange_queue_path )
90
76
91
77
92
- def test_queue_info_with_validations () -> None :
93
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
94
- connection .dial ()
78
+ def test_queue_info_with_validations (management : Management ) -> None :
95
79
96
80
queue_name = "test_queue_info_with_validation"
97
- management = connection .management ()
98
81
99
82
queue_specification = QuorumQueueSpecification (
100
83
name = queue_name ,
@@ -111,12 +94,9 @@ def test_queue_info_with_validations() -> None:
111
94
assert queue_info .message_count == 0
112
95
113
96
114
- def test_queue_info_for_stream_with_validations () -> None :
115
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
116
- connection .dial ()
97
+ def test_queue_info_for_stream_with_validations (management : Management ) -> None :
117
98
118
99
stream_name = "test_stream_info_with_validation"
119
- management = connection .management ()
120
100
121
101
queue_specification = StreamSpecification (
122
102
name = stream_name ,
@@ -132,13 +112,10 @@ def test_queue_info_for_stream_with_validations() -> None:
132
112
assert stream_info .message_count == 0
133
113
134
114
135
- def test_queue_precondition_fail () -> None :
136
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
137
- connection .dial ()
115
+ def test_queue_precondition_fail (management : Management ) -> None :
138
116
test_failure = True
139
117
140
118
queue_name = "test-queue_precondition_fail"
141
- management = connection .management ()
142
119
143
120
queue_specification = QuorumQueueSpecification (
144
121
name = queue_name , is_auto_delete = False
@@ -162,12 +139,9 @@ def test_queue_precondition_fail() -> None:
162
139
assert test_failure is False
163
140
164
141
165
- def test_declare_classic_queue () -> None :
166
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
167
- connection .dial ()
142
+ def test_declare_classic_queue (management : Management ) -> None :
168
143
169
144
queue_name = "test-declare_classic_queue"
170
- management = connection .management ()
171
145
172
146
queue_specification = QuorumQueueSpecification (
173
147
name = queue_name ,
@@ -182,12 +156,9 @@ def test_declare_classic_queue() -> None:
182
156
management .delete_queue (queue_name )
183
157
184
158
185
- def test_declare_classic_queue_with_args () -> None :
186
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
187
- connection .dial ()
159
+ def test_declare_classic_queue_with_args (management : Management ) -> None :
188
160
189
161
queue_name = "test-queue_with_args"
190
- management = connection .management ()
191
162
192
163
queue_specification = ClassicQueueSpecification (
193
164
name = queue_name ,
@@ -220,12 +191,8 @@ def test_declare_classic_queue_with_args() -> None:
220
191
management .delete_queue (queue_name )
221
192
222
193
223
- def test_declare_classic_queue_with_invalid_args () -> None :
224
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
225
- connection .dial ()
226
-
194
+ def test_declare_classic_queue_with_invalid_args (management : Management ) -> None :
227
195
queue_name = "test-queue_with_args"
228
- management = connection .management ()
229
196
test_failure = True
230
197
231
198
queue_specification = ClassicQueueSpecification (
@@ -244,12 +211,8 @@ def test_declare_classic_queue_with_invalid_args() -> None:
244
211
assert test_failure is False
245
212
246
213
247
- def test_declare_stream_with_args () -> None :
248
- connection = Connection ("amqp://guest:guest@localhost:5672/" )
249
- connection .dial ()
250
-
214
+ def test_declare_stream_with_args (management : Management ) -> None :
251
215
stream_name = "test-stream_with_args"
252
- management = connection .management ()
253
216
254
217
stream_specification = StreamSpecification (
255
218
name = stream_name ,
0 commit comments