Skip to content

Commit c5f0e8c

Browse files
committed
Problem: ZMQ_STREAM sockets can get stuck trying to disconnect when hwm is at limit
Solution: Implement zmq_disconnect_peer support for ZMQ_STREAM
1 parent 13e5c52 commit c5f0e8c

File tree

7 files changed

+166
-108
lines changed

7 files changed

+166
-108
lines changed

Makefile.am

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,8 @@ test_apps = \
460460
tests/test_probe_router \
461461
tests/test_stream \
462462
tests/test_stream_empty \
463+
tests/test_stream_disconnect_peer \
463464
tests/test_stream_disconnect \
464-
tests/test_stream_hwm_disconnect \
465465
tests/test_stream_timeout \
466466
tests/test_disconnect_inproc \
467467
tests/test_unbind_wildcard \
@@ -636,14 +636,14 @@ tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp
636636
tests_test_stream_timeout_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
637637
tests_test_stream_timeout_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
638638

639+
tests_test_stream_disconnect_peer_SOURCES = tests/test_stream_disconnect_peer.cpp
640+
tests_test_stream_disconnect_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
641+
tests_test_stream_disconnect_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
642+
639643
tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp
640644
tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
641645
tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
642646

643-
tests_test_stream_hwm_disconnect_SOURCES = tests/test_stream_hwm_disconnect.cpp
644-
tests_test_stream_hwm_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
645-
tests_test_stream_hwm_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
646-
647647
tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp
648648
tests_test_disconnect_inproc_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
649649
tests_test_disconnect_inproc_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

doc/zmq_disconnect_peer.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ to send messages addressed with that 'routing_id' will fail with 'EHOSTUNREACH'
1616
until a new connection with a different 'routing_id' is established.
1717

1818
This function is supported on socket types that manage per-peer routing ids:
19-
'ZMQ_SERVER' and 'ZMQ_PEER'. Calling it on other socket types will fail with
19+
'ZMQ_SERVER', 'ZMQ_PEER' and `ZMQ_STREAM`. Calling it on other socket types will fail with
2020
'ENOTSUP'.
2121

2222

src/stream.cpp

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ int zmq::stream_t::xsend (msg_t *msg_)
7979
_current_out = out_pipe->pipe;
8080
if (!_current_out->check_write ()) {
8181
out_pipe->active = false;
82+
_current_out = NULL;
83+
errno = EAGAIN;
84+
return -1;
8285
}
8386
} else {
8487
errno = EHOSTUNREACH;
@@ -116,11 +119,6 @@ int zmq::stream_t::xsend (msg_t *msg_)
116119
_current_out = NULL;
117120
return 0;
118121
}
119-
if (!_current_out->check_write ()) {
120-
_more_out = true;
121-
errno = EAGAIN;
122-
return -1;
123-
}
124122
const bool ok = _current_out->write (msg_);
125123
if (likely (ok))
126124
_current_out->flush ();
@@ -241,6 +239,32 @@ bool zmq::stream_t::xhas_out ()
241239
return true;
242240
}
243241

242+
int zmq::stream_t::xdisconnect_peer (uint32_t routing_id_)
243+
{
244+
unsigned char buffer[5];
245+
buffer[0] = 0;
246+
put_uint32 (buffer + 1, routing_id_);
247+
248+
blob_t routing_id;
249+
routing_id.set (buffer, sizeof buffer);
250+
251+
out_pipe_t *out_pipe = lookup_out_pipe (routing_id);
252+
if (!out_pipe) {
253+
errno = EHOSTUNREACH;
254+
return -1;
255+
}
256+
257+
out_pipe->pipe->terminate (false);
258+
259+
// if currently writing to this pipe at same time, reset _current_out and _more_out
260+
if (out_pipe->pipe == _current_out) {
261+
_current_out = NULL;
262+
_more_out = false;
263+
}
264+
265+
return 0;
266+
}
267+
244268
void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
245269
{
246270
// Always assign routing id for raw-socket
@@ -263,4 +287,4 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
263287
}
264288
pipe_->set_router_socket_routing_id (routing_id);
265289
add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
266-
}
290+
}

src/stream.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class stream_t ZMQ_FINAL : public routing_socket_base_t
2929
void xread_activated (zmq::pipe_t *pipe_);
3030
void xpipe_terminated (zmq::pipe_t *pipe_);
3131
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
32+
int xdisconnect_peer (uint32_t routing_id_) ZMQ_OVERRIDE;
3233

3334
private:
3435
// Generate peer's id and update lookup map

tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ set(tests
2626
test_probe_router
2727
test_stream
2828
test_stream_empty
29+
test_stream_disconnect_peer
2930
test_stream_disconnect
30-
test_stream_hwm_disconnect
3131
test_disconnect_inproc
3232
test_unbind_wildcard
3333
test_ctx_options
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/* SPDX-License-Identifier: MPL-2.0 */
2+
3+
#define ZMQ_BUILD_DRAFT_API
4+
5+
#include "testutil.hpp"
6+
#include "testutil_unity.hpp"
7+
8+
#include <string.h>
9+
10+
#if defined ZMQ_HAVE_WINDOWS
11+
#include <winsock2.h>
12+
#else
13+
#include <arpa/inet.h>
14+
#endif
15+
16+
SETUP_TEARDOWN_TESTCONTEXT
17+
18+
// Helper to extract numeric host ID from the 5-byte ZMQ_STREAM frame [0x00][uint32]
19+
static uint32_t extract_id (zmq_msg_t *msg_)
20+
{
21+
TEST_ASSERT_EQUAL_INT (5, zmq_msg_size (msg_));
22+
const unsigned char *id_ptr = (const unsigned char *) zmq_msg_data (msg_);
23+
uint32_t net_id;
24+
memcpy (&net_id, id_ptr + 1, 4);
25+
return ntohl (net_id);
26+
}
27+
28+
static void test_stream_disconnect_peer ()
29+
{
30+
char my_endpoint[MAX_SOCKET_STRING];
31+
32+
// We'll be using this socket to test the surgical disconnect API
33+
void *stream = test_context_socket (ZMQ_STREAM);
34+
35+
// Set timeouts to prevent the test from hanging indefinitely on failure
36+
int timeout = 500;
37+
TEST_ASSERT_SUCCESS_ERRNO (
38+
zmq_setsockopt (stream, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
39+
TEST_ASSERT_SUCCESS_ERRNO (
40+
zmq_setsockopt (stream, ZMQ_RCVTIMEO, &timeout, sizeof (timeout)));
41+
42+
bind_loopback_ipv4 (stream, my_endpoint, sizeof (my_endpoint));
43+
44+
// Connect two distinct clients to test isolation and state reset
45+
fd_t fd_a = connect_socket (my_endpoint);
46+
fd_t fd_b = connect_socket (my_endpoint);
47+
48+
zmq_msg_t msg;
49+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
50+
51+
// Peer A Setup: Receive connection notification
52+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0));
53+
uint32_t id_a_numeric = extract_id (&msg);
54+
unsigned char id_a_raw[5];
55+
memcpy (id_a_raw, zmq_msg_data (&msg), 5);
56+
TEST_ASSERT_EQUAL_INT (
57+
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)));
58+
59+
// Peer B Setup: Receive connection notification
60+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0));
61+
uint32_t id_b_numeric = extract_id (&msg);
62+
unsigned char id_b_raw[5];
63+
memcpy (id_b_raw, zmq_msg_data (&msg), 5);
64+
TEST_ASSERT_EQUAL_INT (
65+
0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0)));
66+
67+
// Verify Peer IDs are unique
68+
TEST_ASSERT_NOT_EQUAL (id_a_numeric, id_b_numeric);
69+
70+
// --- CASE 1: THE DIRTY RESET ---
71+
// Start a multi-part message to Peer A.
72+
// This locks the socket state machine (_more_out = true, _current_out = Pipe A).
73+
TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE));
74+
75+
// Use the new API to surgically disconnect Peer A.
76+
// This must force-reset the internal 'more' state and NULL the current pipe.
77+
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect_peer (stream, id_a_numeric));
78+
msleep (SETTLE_TIME);
79+
80+
// Attempt to talk to Peer B immediately.
81+
// If the reset failed, this would misroute the ID frame as data for Peer A.
82+
TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, id_b_raw, 5, ZMQ_SNDMORE));
83+
TEST_ASSERT_EQUAL_INT (5, zmq_send (stream, "HELLO", 5, 0));
84+
85+
// Verify Peer B actually received the data via raw TCP
86+
char recv_buf[5];
87+
int bytes = recv (fd_b, recv_buf, 5, 0);
88+
TEST_ASSERT_EQUAL_INT (5, bytes);
89+
TEST_ASSERT_EQUAL_STRING_LEN ("HELLO", recv_buf, 5);
90+
91+
// --- CASE 2: SURGICAL ISOLATION ---
92+
// Verify Peer A is gone from the routing table; sending to it should fail.
93+
int rc = zmq_send (stream, id_a_raw, 5, ZMQ_SNDMORE);
94+
TEST_ASSERT_EQUAL_INT (-1, rc);
95+
TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno);
96+
97+
// --- CASE 3: INBOUND INTEGRITY ---
98+
// Ensure Peer B can still send data to the server (FQ remains intact).
99+
const char *ping = "PING";
100+
send (fd_b, ping, 4, 0);
101+
msleep (SETTLE_TIME);
102+
103+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0));
104+
TEST_ASSERT_EQUAL_INT (id_b_numeric, extract_id (&msg));
105+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0));
106+
TEST_ASSERT_EQUAL_STRING_LEN (ping, (char *) zmq_msg_data (&msg), 4);
107+
108+
// --- CASE 4: ERROR HANDLING ---
109+
// Attempt to disconnect a non-existent ID
110+
rc = zmq_disconnect_peer (stream, 0x12345678);
111+
TEST_ASSERT_EQUAL_INT (-1, rc);
112+
TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno);
113+
114+
// Cleanup
115+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
116+
close (fd_a);
117+
close (fd_b);
118+
test_context_socket_close (stream);
119+
}
120+
121+
int main (void)
122+
{
123+
setup_test_environment ();
124+
125+
UNITY_BEGIN ();
126+
RUN_TEST (test_stream_disconnect_peer);
127+
return UNITY_END ();
128+
}

tests/test_stream_hwm_disconnect.cpp

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)