Skip to content

Commit 175e9d2

Browse files
Test both ZMQ and MSGQ modes on all platforms (#670)
1 parent 0f26072 commit 175e9d2

File tree

8 files changed

+52
-8
lines changed

8 files changed

+52
-8
lines changed

msgq/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# must be built with scons
22
from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \
3-
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event
3+
set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event, \
4+
context_is_zmq
45
from msgq.ipc_pyx import MultiplePublishersError, IpcError
56

67
from typing import Optional, List, Union
@@ -12,6 +13,7 @@
1213
assert get_fake_prefix
1314
assert delete_fake_prefix
1415
assert wait_for_one_event
16+
assert context_is_zmq
1517

1618
NO_TRAVERSAL_LIMIT = 2**64-1
1719

msgq/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import os
2+
import pytest
3+
import msgq
4+
5+
@pytest.fixture(params=[False, True], ids=["msgq", "zmq"], autouse=True)
6+
def zmq_mode(request):
7+
if request.param:
8+
os.environ["ZMQ"] = "1"
9+
else:
10+
os.environ.pop("ZMQ", None)
11+
msgq.context = msgq.Context()
12+
assert msgq.context_is_zmq() == request.param
13+
yield request.param
14+
os.environ.pop("ZMQ", None)

msgq/ipc.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ cdef extern from "msgq/impl_fake.h":
3535

3636

3737
cdef extern from "msgq/ipc.h":
38+
bool messaging_use_zmq()
39+
3840
cdef cppclass Context:
3941
@staticmethod
4042
Context * create()

msgq/ipc_pyx.pyx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ from libc.string cimport strerror
1010
from cython.operator import dereference
1111

1212

13+
from .ipc cimport messaging_use_zmq
1314
from .ipc cimport Context as cppContext
1415
from .ipc cimport SubSocket as cppSubSocket
1516
from .ipc cimport PubSocket as cppPubSocket
@@ -18,6 +19,10 @@ from .ipc cimport Message as cppMessage
1819
from .ipc cimport Event as cppEvent, SocketEventHandle as cppSocketEventHandle
1920

2021

22+
def context_is_zmq():
23+
return messaging_use_zmq()
24+
25+
2126
class IpcError(Exception):
2227
def __init__(self, endpoint=None):
2328
suffix = f"with {endpoint.decode('utf-8')}" if endpoint else ""

msgq/msgq.cc

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,18 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
446446
}
447447

448448
int ms = (timeout == -1) ? 100 : timeout;
449+
450+
#ifdef __APPLE__
451+
// On macOS, signals can't interrupt nanosleep, so poll more frequently
452+
int poll_ms = std::min(ms, 10);
453+
int remaining_ms = ms;
454+
#else
455+
int poll_ms = ms;
456+
#endif
457+
449458
struct timespec ts;
450-
ts.tv_sec = ms / 1000;
451-
ts.tv_nsec = (ms % 1000) * 1000 * 1000;
459+
ts.tv_sec = poll_ms / 1000;
460+
ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000;
452461

453462

454463
while (num == 0) {
@@ -464,10 +473,23 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
464473
}
465474
}
466475

476+
#ifdef __APPLE__
477+
// exit if we had a timeout and we've exhausted it
478+
if (timeout != -1 && ret == 0){
479+
remaining_ms -= poll_ms;
480+
if (remaining_ms <= 0){
481+
break;
482+
}
483+
poll_ms = std::min(remaining_ms, 10);
484+
ts.tv_sec = poll_ms / 1000;
485+
ts.tv_nsec = (poll_ms % 1000) * 1000 * 1000;
486+
}
487+
#else
467488
// exit if we had a timeout and the sleep finished
468489
if (timeout != -1 && ret == 0){
469490
break;
470491
}
492+
#endif
471493
}
472494

473495
return num;

msgq/tests/test_fake.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,13 @@ def test_wait_zero_timeout(self):
6464

6565

6666
@pytest.mark.skipif(condition=platform.system() == "Darwin", reason="FakeSockets not supported on macOS")
67-
@pytest.mark.skipif(condition="ZMQ" in os.environ, reason="FakeSockets not supported on ZMQ")
6867
@parameterized_class([{"prefix": None}, {"prefix": "test"}])
6968
class TestFakeSockets:
7069
prefix: Optional[str] = None
7170

7271
def setup_method(self):
72+
if "ZMQ" in os.environ:
73+
pytest.skip("FakeSockets not supported on ZMQ")
7374
msgq.toggle_fake_events(True)
7475
if self.prefix is not None:
7576
msgq.set_fake_prefix(self.prefix)

msgq/tests/test_messaging.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ def test_conflate(self):
6161
@pytest.mark.flaky(retries=3, delay=1)
6262
def test_receive_timeout(self):
6363
sock = random_sock()
64-
for _ in range(10):
64+
for _ in range(5):
6565
timeout = random.randrange(200)
6666
sub_sock = msgq.sub_sock(sock, timeout=timeout)
6767
zmq_sleep()
6868

6969
start_time = time.monotonic()
7070
recvd = sub_sock.receive()
71-
assert (time.monotonic() - start_time) < 0.2
71+
assert (time.monotonic() - start_time) < (timeout + 0.1)
7272
assert recvd is None

setup.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ PLATFORM=$(uname -s)
88

99
echo "installing dependencies"
1010
if [[ $PLATFORM == "Darwin" ]]; then
11-
export ZMQ=1
12-
1311
export HOMEBREW_NO_AUTO_UPDATE=1
1412
brew install python3 zeromq
1513
elif [[ $PLATFORM == "Linux" ]]; then

0 commit comments

Comments
 (0)