Skip to content

Commit 10a263b

Browse files
authored
Merge pull request #963 from somdoron/sockets
add a way to send and receive routing keys
2 parents 528e3aa + 1812271 commit 10a263b

5 files changed

+198
-17
lines changed

src/NetMQ.Tests/RouterTests.cs

+21
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,26 @@ public void Handover()
130130
}
131131
}
132132
}
133+
134+
[Fact]
135+
public void RoutingKeys()
136+
{
137+
using var router = new RouterSocket("inproc://routing-keys");
138+
using var dealer = new DealerSocket("inproc://routing-keys");
139+
140+
dealer.SendRoutingKeys(new RoutingKey(1)).SendFrame("Hello");
141+
142+
var keys = router.ReceiveRoutingKeys();
143+
var message = router.ReceiveFrameString();
144+
145+
Assert.Equal("Hello", message);
146+
147+
router.SendRoutingKeys(keys).SendFrame("World");
148+
149+
dealer.ReceiveRoutingKeys();
150+
var reply = dealer.ReceiveFrameString();
151+
152+
Assert.Equal("World", reply);
153+
}
133154
}
134155
}

src/NetMQ/NetMQ-unix.csproj

-15
This file was deleted.

src/NetMQ/OutgoingSocketExtensions.cs

+83
Original file line numberDiff line numberDiff line change
@@ -685,5 +685,88 @@ public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, R
685685
}
686686

687687
#endregion
688+
689+
#region Sending Routing Keys
690+
691+
/// <summary>
692+
/// Send empty list of routing keys over <paramref name="socket"/>, append an empty message at the end of the keys.
693+
/// </summary>
694+
/// <param name="socket">the IOutgoingSocket to transmit on</param>
695+
public static IOutgoingSocket SendEmptyRoutingKeys(this IOutgoingSocket socket)
696+
{
697+
return socket.SendMoreFrameEmpty();
698+
}
699+
700+
/// <summary>
701+
/// Send a single routing key over <paramref name="socket"/>, append an empty message afterwards.
702+
/// </summary>
703+
/// <param name="socket">the IOutgoingSocket to transmit on</param>
704+
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, params RoutingKey[] routingKeys)
705+
{
706+
foreach(var routingKey in routingKeys)
707+
socket.SendMoreFrame(routingKey);
708+
709+
socket.SendMoreFrameEmpty();
710+
711+
return socket;
712+
}
713+
714+
/// <summary>
715+
/// Send routing keys over <paramref name="socket"/>, append an empty message at the end of the keys.
716+
/// </summary>
717+
/// <param name="socket">the IOutgoingSocket to transmit on</param>
718+
/// <param name="routingKeys">the routing keys to send</param>
719+
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, IEnumerable<RoutingKey> routingKeys)
720+
{
721+
foreach(var routingKey in routingKeys)
722+
socket.SendMoreFrame(routingKey);
723+
724+
socket.SendMoreFrameEmpty();
725+
726+
return socket;
727+
}
728+
729+
/// <summary>
730+
/// Attempt to transmit routing keys over <paramref name="socket"/>.
731+
/// If message cannot be sent immediately, return <c>false</c>.
732+
/// Routing is always sent as more frame.
733+
/// </summary>
734+
/// <param name="socket">the IOutgoingSocket to transmit on</param>
735+
/// <param name="routingKeys">the routing keys to send</param>
736+
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
737+
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, IEnumerable<RoutingKey> routingKeys)
738+
{
739+
return socket.TrySendRoutingKeys(TimeSpan.Zero, routingKeys);
740+
}
741+
742+
/// <summary>
743+
/// Attempt to transmit routing key over <paramref name="socket"/>.
744+
/// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
745+
/// Routing is always sent as more frame.
746+
/// </summary>
747+
/// <param name="socket">the IOutgoingSocket to transmit on</param>
748+
/// <param name="timeout">The maximum period of time to try to send a message.</param>
749+
/// <param name="routingKeys">the routing keys to send</param>
750+
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
751+
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable<RoutingKey> routingKeys)
752+
{
753+
var enumerator = routingKeys.GetEnumerator();
754+
755+
// Empty collection, just trying to send the empty message
756+
if (!enumerator.MoveNext())
757+
return socket.TrySendFrameEmpty(timeout, true);
758+
759+
if (!socket.TrySendFrame(enumerator.Current))
760+
return false;
761+
762+
while (enumerator.MoveNext())
763+
socket.SendMoreFrame(enumerator.Current);
764+
765+
socket.SendMoreFrameEmpty();
766+
767+
return true;
768+
}
769+
770+
#endregion
688771
}
689772
}

src/NetMQ/ReceivingSocketExtensions.cs

+85-2
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public static string ReceiveFrameString(this IReceivingSocket socket, Encoding e
316316
}
317317
finally
318318
{
319-
msg.Close();
319+
msg.Close();
320320
}
321321
}
322322

@@ -449,7 +449,7 @@ public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan
449449
}
450450
finally
451451
{
452-
msg.Close();
452+
msg.Close();
453453
}
454454
}
455455

@@ -1111,5 +1111,88 @@ public static bool TryReceiveRoutingKey(this IReceivingSocket socket, TimeSpan t
11111111
}
11121112

11131113
#endregion
1114+
1115+
#region Receiving a routing keys
1116+
1117+
/// <summary>
1118+
/// Receive routing keys from <paramref name="socket"/> until a bottom message arrives (empty message), blocking until one arrives.
1119+
/// </summary>
1120+
/// <param name="socket">The socket to receive from.</param>
1121+
/// <returns>The routing keys.</returns>
1122+
public static IEnumerable<RoutingKey> ReceiveRoutingKeys(this IReceivingSocket socket)
1123+
{
1124+
List<RoutingKey> keys = new List<RoutingKey>();
1125+
1126+
while (true)
1127+
{
1128+
var routingKey = socket.ReceiveRoutingKey(out bool more);
1129+
if (!more)
1130+
throw new InvalidException("Malformed multipart message, empty message expected");
1131+
1132+
if (routingKey.Bytes.Length == 0)
1133+
break;
1134+
1135+
keys.Add(routingKey);
1136+
}
1137+
1138+
return keys;
1139+
}
1140+
1141+
/// <summary>
1142+
/// Attempt to receive routing-keys from <paramref name="socket"/>, an empty message expected at the end of routing keys.
1143+
/// If no message is immediately available, return <c>false</c>.
1144+
/// </summary>
1145+
/// <param name="socket">The socket to receive from.</param>
1146+
/// <param name="routingKeys">The routing-keys of the received message.</param>
1147+
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
1148+
public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
1149+
{
1150+
return TryReceiveRoutingKeys(socket, TimeSpan.Zero, out routingKeys);
1151+
}
1152+
1153+
/// <summary>
1154+
/// Attempt to receive a routing-keys from <paramref name="socket"/>.
1155+
/// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
1156+
/// </summary>
1157+
/// <param name="socket">The socket to receive from.</param>
1158+
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
1159+
/// <param name="routingKeys">The routing-keys of the received message.</param>
1160+
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
1161+
public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
1162+
{
1163+
RoutingKey first = new RoutingKey();
1164+
1165+
if (socket.TryReceiveRoutingKey(timeout, ref first, out bool more))
1166+
{
1167+
if (!more)
1168+
throw new InvalidException("Malformed multipart message, empty message expected");
1169+
1170+
List<RoutingKey> keys = new List<RoutingKey>();
1171+
routingKeys = keys;
1172+
1173+
if (first.Bytes.Length == 0)
1174+
return true;
1175+
1176+
keys.Add(first);
1177+
while (true)
1178+
{
1179+
var routingKey = socket.ReceiveRoutingKey(out more);
1180+
if (!more)
1181+
throw new InvalidException("Malformed multipart message, empty message expected");
1182+
1183+
if (routingKey.Bytes.Length == 0)
1184+
break;
1185+
1186+
keys.Add(routingKey);
1187+
}
1188+
1189+
return true;
1190+
}
1191+
1192+
routingKeys = null;
1193+
return false;
1194+
}
1195+
1196+
#endregion
11141197
}
11151198
}

src/NetMQ/RoutingKey.cs

+9
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ public RoutingKey(string b64)
3131
bytes = Convert.FromBase64String(b64);
3232
}
3333

34+
/// <summary>
35+
/// Create a new routing key out of a Int64
36+
/// </summary>
37+
/// <param name="value"></param>
38+
public RoutingKey(long value)
39+
{
40+
bytes = NetworkOrderBitsConverter.GetBytes(value);
41+
}
42+
3443
internal byte[] Bytes
3544
{
3645
get { return bytes; }

0 commit comments

Comments
 (0)