1+ /* SPDX-License-Identifier: MPL-2.0 */
2+
3+ #define ZMQ_BUILD_DRAFT_API
4+
5+ #include " testutil.hpp"
6+ #include " testutil_unity.hpp"
7+
8+ #include < string.h>
9+
10+ #if defined ZMQ_HAVE_WINDOWS
11+ #include < winsock2.h>
12+ #else
13+ #include < arpa/inet.h>
14+ #endif
15+
16+ SETUP_TEARDOWN_TESTCONTEXT
17+
18+ // Helper to extract numeric host ID from the 5-byte ZMQ_STREAM frame [0x00][uint32]
19+ static uint32_t extract_id (zmq_msg_t *msg_)
20+ {
21+ TEST_ASSERT_EQUAL_INT (5 , zmq_msg_size (msg_));
22+ const unsigned char *id_ptr = (const unsigned char *) zmq_msg_data (msg_);
23+ uint32_t net_id;
24+ memcpy (&net_id, id_ptr + 1 , 4 );
25+ return ntohl (net_id);
26+ }
27+
28+ static void test_stream_disconnect_peer ()
29+ {
30+ char my_endpoint[MAX_SOCKET_STRING];
31+
32+ // We'll be using this socket to test the surgical disconnect API
33+ void *stream = test_context_socket (ZMQ_STREAM);
34+
35+ // Set timeouts to prevent the test from hanging indefinitely on failure
36+ int timeout = 500 ;
37+ TEST_ASSERT_SUCCESS_ERRNO (
38+ zmq_setsockopt (stream, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
39+ TEST_ASSERT_SUCCESS_ERRNO (
40+ zmq_setsockopt (stream, ZMQ_RCVTIMEO, &timeout, sizeof (timeout)));
41+
42+ bind_loopback_ipv4 (stream, my_endpoint, sizeof (my_endpoint));
43+
44+ // Connect two distinct clients to test isolation and state reset
45+ fd_t fd_a = connect_socket (my_endpoint);
46+ fd_t fd_b = connect_socket (my_endpoint);
47+
48+ zmq_msg_t msg;
49+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
50+
51+ // Peer A Setup: Receive connection notification
52+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 ));
53+ uint32_t id_a_numeric = extract_id (&msg);
54+ unsigned char id_a_raw[5 ];
55+ memcpy (id_a_raw, zmq_msg_data (&msg), 5 );
56+ TEST_ASSERT_EQUAL_INT (0 , TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 )));
57+
58+ // Peer B Setup: Receive connection notification
59+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 ));
60+ uint32_t id_b_numeric = extract_id (&msg);
61+ unsigned char id_b_raw[5 ];
62+ memcpy (id_b_raw, zmq_msg_data (&msg), 5 );
63+ TEST_ASSERT_EQUAL_INT (0 , TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 )));
64+
65+ // Verify Peer IDs are unique
66+ TEST_ASSERT_NOT_EQUAL (id_a_numeric, id_b_numeric);
67+
68+ // --- CASE 1: THE DIRTY RESET ---
69+ // Start a multi-part message to Peer A.
70+ // This locks the socket state machine (_more_out = true, _current_out = Pipe A).
71+ TEST_ASSERT_EQUAL_INT (5 , zmq_send (stream, id_a_raw, 5 , ZMQ_SNDMORE));
72+
73+ // Use the new API to surgically disconnect Peer A.
74+ // This must force-reset the internal 'more' state and NULL the current pipe.
75+ TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect_peer (stream, id_a_numeric));
76+ msleep (SETTLE_TIME);
77+
78+ // Attempt to talk to Peer B immediately.
79+ // If the reset failed, this would misroute the ID frame as data for Peer A.
80+ TEST_ASSERT_EQUAL_INT (5 , zmq_send (stream, id_b_raw, 5 , ZMQ_SNDMORE));
81+ TEST_ASSERT_EQUAL_INT (5 , zmq_send (stream, " HELLO" , 5 , 0 ));
82+
83+ // Verify Peer B actually received the data via raw TCP
84+ char recv_buf[5 ];
85+ int bytes = recv (fd_b, recv_buf, 5 , 0 );
86+ TEST_ASSERT_EQUAL_INT (5 , bytes);
87+ TEST_ASSERT_EQUAL_STRING_LEN (" HELLO" , recv_buf, 5 );
88+
89+ // --- CASE 2: SURGICAL ISOLATION ---
90+ // Verify Peer A is gone from the routing table; sending to it should fail.
91+ int rc = zmq_send (stream, id_a_raw, 5 , ZMQ_SNDMORE);
92+ TEST_ASSERT_EQUAL_INT (-1 , rc);
93+ TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno);
94+
95+ // --- CASE 3: INBOUND INTEGRITY ---
96+ // Ensure Peer B can still send data to the server (FQ remains intact).
97+ const char *ping = " PING" ;
98+ send (fd_b, ping, 4 , 0 );
99+ msleep (SETTLE_TIME);
100+
101+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 ));
102+ TEST_ASSERT_EQUAL_INT (id_b_numeric, extract_id (&msg));
103+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, stream, 0 ));
104+ TEST_ASSERT_EQUAL_STRING_LEN (ping, (char *) zmq_msg_data (&msg), 4 );
105+
106+ // --- CASE 4: ERROR HANDLING ---
107+ // Attempt to disconnect a non-existent ID
108+ rc = zmq_disconnect_peer (stream, 0x12345678 );
109+ TEST_ASSERT_EQUAL_INT (-1 , rc);
110+ TEST_ASSERT_EQUAL_INT (EHOSTUNREACH, errno);
111+
112+ // Cleanup
113+ TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
114+ close (fd_a);
115+ close (fd_b);
116+ test_context_socket_close (stream);
117+ }
118+
119+ int main (void )
120+ {
121+ setup_test_environment ();
122+
123+ UNITY_BEGIN ();
124+ RUN_TEST (test_stream_disconnect_peer);
125+ return UNITY_END ();
126+ }
0 commit comments