33import asyncio
44from functools import lru_cache
55from itertools import cycle
6+ import os
67import random
78import string
89import time
910import typing as _t
11+ from unittest .mock import patch
1012
1113import pytest
1214import pytest_cases
1517 AsyncioConnector ,
1618 Channel ,
1719 Connector ,
20+ ZMQConnector ,
1821)
1922from plugboard .exceptions import ChannelClosedError
2023from plugboard .schemas .connector import ConnectorMode , ConnectorSpec
21- from tests .conftest import zmq_connector_cls
24+ from plugboard .utils .di import DI
25+ from plugboard .utils .settings import Settings
26+
27+
28+ @pytest_cases .fixture
29+ @pytest_cases .parametrize (zmq_pubsub_proxy = [False ])
30+ def zmq_connector_cls (zmq_pubsub_proxy : bool ) -> _t .Iterator [_t .Type [ZMQConnector ]]:
31+ """Returns the ZMQConnector class with the specified proxy setting.
32+
33+ Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
34+ """
35+ with patch .dict (
36+ os .environ ,
37+ {"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY" : str (zmq_pubsub_proxy )},
38+ ):
39+ testing_settings = Settings ()
40+ DI .settings .override (testing_settings )
41+ yield ZMQConnector
42+ DI .settings .reset_override ()
43+
44+
45+ @pytest_cases .fixture
46+ @pytest_cases .parametrize (_connector_cls = [AsyncioConnector , zmq_connector_cls ])
47+ def connector_cls (_connector_cls : type [Connector ]) -> type [Connector ]:
48+ """Fixture for `Connector` of various types."""
49+ return _connector_cls
2250
2351
2452TEST_ITEMS = string .ascii_lowercase
@@ -115,10 +143,8 @@ async def recv_messages_unordered(channels: list[Channel]) -> list[int]:
115143@pytest_cases .parametrize (
116144 "connector_cls, num_subscribers, num_messages" ,
117145 [
118- (AsyncioConnector , 1 , 1000 ),
119- (AsyncioConnector , 10 , 1000 ),
120- (zmq_connector_cls , 1 , 1000 ),
121- (zmq_connector_cls , 100 , 1000 ),
146+ (connector_cls , 1 , 100 ),
147+ (connector_cls , 10 , 100 ),
122148 ],
123149)
124150async def test_pubsub_channel_single_publisher (
@@ -129,6 +155,12 @@ async def test_pubsub_channel_single_publisher(
129155 In this test there is a single publisher. Messages are expected to be received by all
130156 subscribers exactly once and in order.
131157 """
158+ await _test_pubsub_channel_single_publisher (connector_cls , num_subscribers , num_messages )
159+
160+
161+ async def _test_pubsub_channel_single_publisher (
162+ connector_cls : type [Connector ], num_subscribers : int , num_messages : int
163+ ) -> None :
132164 num_publishers : int = 1
133165 connector_spec = ConnectorSpec (
134166 source = "pubsub-test-0.publishers" ,
@@ -166,20 +198,26 @@ async def test_pubsub_channel_single_publisher(
166198@pytest_cases .parametrize (
167199 "connector_cls, num_publishers, num_subscribers, num_messages" ,
168200 [
169- (AsyncioConnector , 10 , 1 , 1000 ),
170- (AsyncioConnector , 10 , 10 , 1000 ),
171- (zmq_connector_cls , 10 , 1 , 1000 ),
172- (zmq_connector_cls , 10 , 100 , 1000 ),
201+ (connector_cls , 10 , 1 , 100 ),
202+ (connector_cls , 10 , 10 , 100 ),
173203 ],
174204)
175- async def test_pubsub_channel_multiple_publshers (
205+ async def test_pubsub_channel_multiple_publishers (
176206 connector_cls : type [Connector ], num_publishers : int , num_subscribers : int , num_messages : int
177207) -> None :
178208 """Tests the various pubsub `Channel` classes in pubsub mode.
179209
180210 In this test there are multiple publishers. Messages are expected to be received by all
181211 subscribers exactly once but they are not expected to be in order.
182212 """
213+ await _test_pubsub_channel_multiple_publishers (
214+ connector_cls , num_publishers , num_subscribers , num_messages
215+ )
216+
217+
218+ async def _test_pubsub_channel_multiple_publishers (
219+ connector_cls : type [Connector ], num_publishers : int , num_subscribers : int , num_messages : int
220+ ) -> None :
183221 connector_spec = ConnectorSpec (
184222 source = "pubsub-test-1.publishers" ,
185223 target = "pubsub-test-1.subscribers" ,
@@ -216,13 +254,11 @@ async def test_pubsub_channel_multiple_publshers(
216254@pytest_cases .parametrize (
217255 "connector_cls, num_topics, num_publishers, num_subscribers, num_messages" ,
218256 [
219- (AsyncioConnector , 3 , 10 , 1 , 1000 ),
220- (AsyncioConnector , 3 , 10 , 10 , 1000 ),
221- (zmq_connector_cls , 3 , 10 , 1 , 1000 ),
222- (zmq_connector_cls , 3 , 10 , 100 , 1000 ),
257+ (connector_cls , 3 , 10 , 1 , 100 ),
258+ (connector_cls , 3 , 10 , 10 , 100 ),
223259 ],
224260)
225- async def test_pubsub_channel_multiple_topics_and_publishers (
261+ async def test_pubsub_channel_multiple_topics (
226262 connector_cls : type [Connector ],
227263 num_topics : int ,
228264 num_publishers : int ,
@@ -234,6 +270,18 @@ async def test_pubsub_channel_multiple_topics_and_publishers(
234270 In this test there are multiple topics and publishers. Messages are expected to be received by
235271 all subscribers exactly once but they are not expected to be in order.
236272 """
273+ await _test_pubsub_channel_multiple_topics_and_publishers (
274+ connector_cls , num_topics , num_publishers , num_subscribers , num_messages
275+ )
276+
277+
278+ async def _test_pubsub_channel_multiple_topics_and_publishers (
279+ connector_cls : type [Connector ],
280+ num_topics : int ,
281+ num_publishers : int ,
282+ num_subscribers : int ,
283+ num_messages : int ,
284+ ) -> None :
237285 all_publishers = []
238286 all_subscribers = []
239287
0 commit comments