Skip to content

Commit 5a3a2d0

Browse files
authored
Merge pull request #1126 from chrbauer/feature/disconnect-msg-for-master
Add Support for Disconnect Message
2 parents 40895b7 + 0a1b70d commit 5a3a2d0

File tree

15 files changed

+303
-133
lines changed

15 files changed

+303
-133
lines changed

src/NetMQ.Tests/SocketOptionsTests.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,5 +147,65 @@ public void HelloMsgInproc()
147147

148148
Assert.Equal("H", msg);
149149
}
150+
151+
152+
[Fact]
153+
public void DisconnectMsgInProc()
154+
{
155+
// Create a router
156+
using var router = new RouterSocket();
157+
router.Options.DisconnectMessage = new byte[] {(byte)'D'};
158+
159+
// bind router
160+
router.Bind("inproc://inproc-hello-msg");
161+
162+
// create a dealer
163+
using var dealer = new DealerSocket();
164+
dealer.Options.HelloMessage = new byte[] {(byte)'H'};
165+
dealer.Connect("inproc://inproc-hello-msg");
166+
167+
var msg = router.ReceiveMultipartMessage();
168+
169+
Assert.Equal("H", msg.Last.ConvertToString());
170+
171+
dealer.Close();
172+
173+
var routerMsg = router.ReceiveMultipartMessage();
174+
175+
176+
Assert.Equal("D",
177+
routerMsg.Last.ConvertToString());
178+
179+
}
180+
181+
182+
[Fact]
183+
public void DisconnectMsgTcp()
184+
{
185+
// Create a router
186+
using var router = new RouterSocket();
187+
router.Options.DisconnectMessage = new byte[] {(byte)'D'};
188+
189+
// bind router
190+
int port = router.BindRandomPort("tcp://*");
191+
192+
// create a dealer
193+
using var dealer = new DealerSocket();
194+
dealer.Options.HelloMessage = new byte[] {(byte)'H'};
195+
dealer.Connect($"tcp://localhost:{port}");
196+
197+
var msg = router.ReceiveMultipartMessage();
198+
199+
Assert.Equal("H", msg.Last.ConvertToString());
200+
201+
dealer.Close();
202+
203+
var routerMsg = router.ReceiveMultipartMessage();
204+
205+
206+
Assert.Equal("D",
207+
routerMsg.Last.ConvertToString());
208+
209+
}
150210
}
151211
}

src/NetMQ/Core/Options.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public Options()
6666
HeartbeatTimeout = -1;
6767
HelloMsg = null;
6868
CanSendHelloMsg = false;
69+
DisconnectMsg = null;
70+
CanGenerateDisconnectMsg = false;
6971
Correlate = false;
7072
Relaxed = false;
7173
}
@@ -321,12 +323,23 @@ public byte IdentitySize {
321323
/// Hello msg to send to peer upon connecting
322324
/// </summary>
323325
public byte[]? HelloMsg { get; set; }
324-
326+
327+
328+
/// <summary>
329+
/// Disconnect msg to send to peer upon disconnecting
330+
/// </summary>
331+
public byte[]? DisconnectMsg { get; set; }
332+
325333
/// <summary>
326334
/// Indicate of socket can send an hello msg
327335
/// </summary>
328336
public bool CanSendHelloMsg { get; set; }
329337

338+
/// <summary>
339+
/// Indicate of socket can generate a disconnect msg
340+
/// </summary>
341+
public bool CanGenerateDisconnectMsg { get; set; }
342+
330343
public bool Correlate { get; set; }
331344
public bool Relaxed { get; set; }
332345

@@ -528,6 +541,27 @@ public void SetSocketOption(ZmqSocketOption option, object? optionValue)
528541
break;
529542
}
530543

544+
545+
case ZmqSocketOption.DisconnectMessage:
546+
{
547+
if (optionValue == null)
548+
{
549+
DisconnectMsg = null;
550+
}
551+
else if( CanGenerateDisconnectMsg )
552+
{
553+
var disconnectMsg = Get<byte[]>();
554+
DisconnectMsg = new byte[disconnectMsg.Length];
555+
556+
Buffer.BlockCopy(disconnectMsg, 0, DisconnectMsg, 0, disconnectMsg.Length);
557+
}
558+
else
559+
{
560+
throw new InvalidException("Socket doesn't support disconnect message");
561+
}
562+
break;
563+
}
564+
531565
case ZmqSocketOption.Relaxed:
532566
{
533567
Relaxed = Get<bool>();

src/NetMQ/Core/Patterns/Peer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public Peer(Ctx parent, int threadId, int socketId)
106106
m_nextPeerId = (uint) s_random.Next();
107107
m_options.SocketType = ZmqSocketType.Peer;
108108
m_options.CanSendHelloMsg = true;
109+
m_options.CanGenerateDisconnectMsg = true;
109110
m_fairQueueing = new FairQueueing();
110111
m_prefetchedMsg = new Msg();
111112
m_prefetchedMsg.InitEmpty();

src/NetMQ/Core/Patterns/Router.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public Router(Ctx parent, int threadId, int socketId)
144144
m_nextPeerId = s_random.Next();
145145
m_options.SocketType = ZmqSocketType.Router;
146146
m_options.CanSendHelloMsg = true;
147+
m_options.CanGenerateDisconnectMsg = true;
147148
m_fairQueueing = new FairQueueing();
148149
m_prefetchedId = new Msg();
149150
m_prefetchedId.InitEmpty();

src/NetMQ/Core/Patterns/Server.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public Server(Ctx parent, int threadId, int socketId)
5656
m_nextRoutingId = (uint) s_random.Next();
5757
m_options.SocketType = ZmqSocketType.Server;
5858
m_options.CanSendHelloMsg = true;
59+
m_options.CanGenerateDisconnectMsg = true;
5960
m_fairQueueing = new FairQueueing();
6061
m_outpipes = new Dictionary<uint, Outpipe>();
6162
}

src/NetMQ/Core/Pipe.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ private enum State
142142
/// </summary>
143143
private readonly ZObject m_parent;
144144

145+
/// <summary>
146+
/// Disconnect msg
147+
/// </summary>
148+
private Msg m_disconnectMsg;
149+
145150
/// <summary>
146151
/// Create a new Pipe object with the given parent, and inbound and outbound YPipes.
147152
/// </summary>
@@ -167,6 +172,7 @@ private Pipe(
167172
m_sink = null;
168173
m_state = State.Active;
169174
m_delay = true;
175+
m_disconnectMsg.InitEmpty();
170176
}
171177

172178
/// <summary>
@@ -229,6 +235,15 @@ public void SetEventSink(IPipeEvents sink)
229235
m_sink = sink;
230236
}
231237

238+
public void SetDisconnectMsg(byte[] disconnectMsg)
239+
{
240+
var msg = new Msg();
241+
242+
msg.InitPool(disconnectMsg.Length);
243+
msg.Put(disconnectMsg, 0, disconnectMsg.Length);
244+
m_disconnectMsg = msg;
245+
}
246+
232247
/// <summary>
233248
/// Get or set the byte-array that comprises the identity of this Pipe.
234249
/// </summary>
@@ -690,5 +705,16 @@ public override string ToString()
690705
{
691706
return base.ToString() + "[" + m_parent + "]";
692707
}
708+
709+
public void SendDisconnectMessage()
710+
{
711+
if (m_disconnectMsg.Size > 0)
712+
{
713+
Rollback();
714+
m_outboundPipe?.Write(ref m_disconnectMsg, false);
715+
Flush();
716+
m_disconnectMsg.InitEmpty();
717+
}
718+
}
693719
}
694720
}

src/NetMQ/Core/SessionBase.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,11 @@ protected override void ProcessAttach(IEngine engine)
399399
Debug.Assert(m_pipe == null);
400400
m_pipe = pipes[0];
401401

402+
if (m_options.CanGenerateDisconnectMsg && m_options.DisconnectMsg is not null)
403+
{
404+
pipes[1].SetDisconnectMsg(m_options.DisconnectMsg);
405+
}
406+
402407
// Ask socket to plug into the remote end of the pipe.
403408
SendBind(m_socket, pipes[1]);
404409
}
@@ -412,14 +417,21 @@ protected override void ProcessAttach(IEngine engine)
412417
/// <summary>
413418
/// Flush out any leftover messages and call Detached.
414419
/// </summary>
415-
public void Detach()
420+
public void Detach(bool handshaked)
416421
{
417422
// Engine is dead. Let's forget about it.
418423
m_engine = null;
419424

420425
// Remove any half-done messages from the pipes.
421426
CleanPipes();
422427

428+
// Only send disconnect message if socket was accepted and handshake was completed
429+
if (m_pipe is not null && m_pipe.Active && handshaked && m_options.CanGenerateDisconnectMsg && m_options.DisconnectMsg?.Length > 0)
430+
{
431+
m_pipe.SetDisconnectMsg(m_options.DisconnectMsg);
432+
m_pipe.SendDisconnectMessage();
433+
}
434+
423435
// Send the event to the derived class.
424436
Detached();
425437

0 commit comments

Comments
 (0)