Skip to content

Commit 91b1885

Browse files
committed
Add more coverage for various pollers
1 parent 126f9e9 commit 91b1885

2 files changed

Lines changed: 56 additions & 25 deletions

File tree

amqpstorm/tests/functional/test_reliability.py

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import importlib
2+
import select
23
import sys
34
import threading
45
import time
@@ -21,8 +22,7 @@ class ReliabilityFunctionalTests(TestFunctionalFramework):
2122
@setup(new_connection=False, queue=True)
2223
def test_functional_open_new_connection_loop(self):
2324
for _ in range(25):
24-
self.connection = self.connection = Connection(HOST, USERNAME,
25-
PASSWORD)
25+
self.connection = self._make_connection()
2626
self.channel = self.connection.channel()
2727

2828
# Make sure that it's a new channel.
@@ -47,7 +47,7 @@ def test_functional_open_new_connection_loop(self):
4747

4848
@setup(new_connection=False, queue=True)
4949
def test_functional_open_close_connection_loop(self):
50-
self.connection = Connection(HOST, USERNAME, PASSWORD, lazy=True)
50+
self.connection = self._make_connection(lazy=True)
5151
for _ in range(25):
5252
self.connection.open()
5353
channel = self.connection.channel()
@@ -75,7 +75,7 @@ def test_functional_open_close_connection_loop(self):
7575

7676
@setup(new_connection=True, new_channel=False, queue=True)
7777
def test_functional_close_gracefully_after_publish_mandatory_fails(self):
78-
for index in range(3):
78+
for _ in range(3):
7979
channel = self.connection.channel()
8080

8181
# Try to publish 25 bad messages.
@@ -95,8 +95,7 @@ def test_functional_close_gracefully_after_publish_mandatory_fails(self):
9595

9696
@setup(new_connection=False, queue=True)
9797
def test_functional_open_close_channel_loop(self):
98-
self.connection = self.connection = Connection(HOST, USERNAME,
99-
PASSWORD)
98+
self.connection = self._make_connection()
10099
for _ in range(25):
101100
channel = self.connection.channel()
102101

@@ -109,19 +108,18 @@ def test_functional_open_close_channel_loop(self):
109108

110109
channel.close()
111110

112-
# Verify that theChannel has been closed properly.
111+
# Verify that the Channel has been closed properly.
113112
self.assertTrue(self.connection.is_open)
114113
self.assertTrue(channel.is_closed)
115114

116115
@setup(new_connection=False, queue=True)
117116
def test_functional_open_multiple_channels(self):
118-
self.connection = self.connection = Connection(HOST, USERNAME,
119-
PASSWORD, lazy=True)
117+
self.connection = self._make_connection(lazy=True)
120118

121119
for _ in range(5):
122120
channels = []
123121
self.connection.open()
124-
for index in range(10):
122+
for _ in range(10):
125123
channel = self.connection.channel()
126124
channels.append(channel)
127125

@@ -132,13 +130,12 @@ def test_functional_open_multiple_channels(self):
132130

133131
@setup(new_connection=False, queue=False)
134132
def test_functional_close_performance(self):
135-
"""Make sure closing a connection never takes longer than ~1 seconds.
133+
"""Make sure closing a connection never takes longer than ~1 second.
136134
137135
:return:
138136
"""
139137
for _ in range(10):
140-
self.connection = self.connection = Connection(HOST, USERNAME,
141-
PASSWORD)
138+
self.connection = self._make_connection()
142139
start_time = time.time()
143140
self.connection.close()
144141
self.assertLess(time.time() - start_time, 3)
@@ -151,9 +148,7 @@ def test_functional_close_after_channel_close_forced_by_server(self):
151148
:return:
152149
"""
153150
for _ in range(10):
154-
connection = Connection(
155-
HOST, USERNAME, PASSWORD
156-
)
151+
connection = self._make_connection()
157152
channel = connection.channel()
158153
channel.confirm_deliveries()
159154
try:
@@ -179,6 +174,7 @@ def test_functional_uri_connection(self):
179174
self.assertTrue(self.connection.is_open)
180175

181176
def test_functional_ssl_connection_without_ssl(self):
177+
# Poller-agnostic: validates the SSL-absence error path.
182178
restore_func = sys.modules['ssl']
183179
try:
184180
sys.modules['ssl'] = None
@@ -195,9 +191,7 @@ def test_functional_ssl_connection_without_ssl(self):
195191

196192
@setup(new_connection=False, queue=True)
197193
def test_functional_verify_passive_declare(self):
198-
199-
self.connection = self.connection = Connection(HOST, USERNAME,
200-
PASSWORD)
194+
self.connection = self._make_connection()
201195
self.channel = self.connection.channel()
202196
self.assertEqual(int(self.channel), 1)
203197

@@ -223,13 +217,22 @@ def test_functional_verify_passive_declare(self):
223217
self.connection.close()
224218

225219

220+
class ReliabilityFunctionalTestsSelect(ReliabilityFunctionalTests):
221+
poller = 'select'
222+
223+
224+
if hasattr(select, 'poll'):
225+
class ReliabilityFunctionalTestsPoll(ReliabilityFunctionalTests):
226+
poller = 'poll'
227+
228+
226229
class PublishAndConsume1kTest(TestFunctionalFramework):
227230
messages_to_send = 1000
228-
messages_consumed = 0
229-
lock = threading.Lock()
230231

231232
def configure(self):
232233
self.disable_logging_validation()
234+
self.messages_consumed = 0
235+
self.lock = threading.Lock()
233236

234237
def publish_messages(self):
235238
for _ in range(self.messages_to_send):
@@ -255,12 +258,12 @@ def increment_message_count(self):
255258
def test_functional_publish_and_consume_1k_messages(self):
256259
self.channel.queue.declare(self.queue_name, durable=True)
257260

258-
publish_thread = threading.Thread(target=self.publish_messages, )
261+
publish_thread = threading.Thread(target=self.publish_messages)
259262
publish_thread.daemon = True
260263
publish_thread.start()
261264

262265
for _ in range(4):
263-
consumer_thread = threading.Thread(target=self.consume_messages, )
266+
consumer_thread = threading.Thread(target=self.consume_messages)
264267
consumer_thread.daemon = True
265268
consumer_thread.start()
266269

@@ -278,6 +281,17 @@ def test_functional_publish_and_consume_1k_messages(self):
278281
'test took too long')
279282

280283

284+
class PublishAndConsume1kTestSelect(PublishAndConsume1kTest):
285+
"""1k publish/consume run against the ``select.select`` poller."""
286+
poller = 'select'
287+
288+
289+
if hasattr(select, 'poll'):
290+
class PublishAndConsume1kTestPoll(PublishAndConsume1kTest):
291+
"""1k publish/consume run against the ``select.poll`` poller."""
292+
poller = 'poll'
293+
294+
281295
class Consume1kUntilEmpty(TestFunctionalFramework):
282296
messages_to_send = 1000
283297

@@ -309,3 +323,14 @@ def test_functional_publish_and_consume_until_empty(self):
309323
'not all messages consumed')
310324

311325
channel.close()
326+
327+
328+
class Consume1kUntilEmptySelect(Consume1kUntilEmpty):
329+
"""Drain-the-queue test run against the ``select.select`` poller."""
330+
poller = 'select'
331+
332+
333+
if hasattr(select, 'poll'):
334+
class Consume1kUntilEmptyPoll(Consume1kUntilEmpty):
335+
"""Drain-the-queue test run against the ``select.poll`` poller."""
336+
poller = 'poll'

amqpstorm/tests/functional/utility.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,20 @@
1414
class TestFunctionalFramework(TestFramework):
1515
"""Extended Test Base for functional unit-tests."""
1616

17+
poller: str = 'select'
18+
1719
def __init__(self, *args, **kwargs):
1820
self.api = ManagementApi(HTTP_URL, USERNAME, PASSWORD, timeout=1)
1921
self.queue_name = None
2022
self.exchange_name = None
2123
self.virtual_host_name = None
2224
super(TestFunctionalFramework, self).__init__(*args, **kwargs)
2325

26+
def _make_connection(self, **kwargs):
27+
kwargs.setdefault('timeout', 1)
28+
kwargs.setdefault('poller', self.poller)
29+
return Connection(HOST, USERNAME, PASSWORD, **kwargs)
30+
2431

2532
def retry_function_wrapper(callable_function, retry_limit=10,
2633
sleep_interval=1):
@@ -63,8 +70,7 @@ def setup_wrapper(self, *args, **kwargs):
6370
self.exchange_name = name
6471
self.virtual_host_name = name
6572
if new_connection:
66-
self.connection = Connection(HOST, USERNAME, PASSWORD,
67-
timeout=1)
73+
self.connection = self._make_connection()
6874
if new_channel:
6975
self.channel = self.connection.channel()
7076
try:

0 commit comments

Comments
 (0)