Skip to content

Commit 77677cd

Browse files
authored
Pub/Sub: Add Blocking Subscribe/Unsubscribe Commands, Parametrized Tests (#211)
Signed-off-by: currantw <taylor.curran@improving.com>
1 parent 5c3d2f8 commit 77677cd

30 files changed

Lines changed: 1688 additions & 1464 deletions

rust/src/ffi.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ unsafe fn convert_pubsub_config(
153153
);
154154
}
155155

156-
// Convert shard channels
156+
// Convert sharded channels
157157
if config.sharded_channel_count > 0 {
158158
let sharded = unsafe {
159159
convert_string_array(config.sharded_channels_ptr, config.sharded_channel_count)

sources/Valkey.Glide/Abstract/Subscriber.cs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ namespace Valkey.Glide;
1111
/// </summary>
1212
internal sealed class Subscriber : ISubscriber
1313
{
14+
// Default async timeout for StackExchange.Redis compabitility.
15+
private static readonly int DefaultTimeoutMs = 5000;
16+
1417
private readonly ConnectionMultiplexer _multiplexer;
1518
private readonly Database _client;
1619

@@ -140,13 +143,14 @@ public async Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
140143
_multiplexer.RemoveAllSubscriptions();
141144

142145
// Send unsubscribe commands for all channel modes.
143-
await _client.UnsubscribeLazyAsync();
144-
await _client.PUnsubscribeLazyAsync();
146+
await _client.UnsubscribeAsync();
147+
await _client.PUnsubscribeAsync();
145148

146149
if (_client.IsCluster)
147150
{
151+
148152
// TODO #205: Refactor to use GlideClusterClient instead of custom command.
149-
await _client.Command(Request.CustomCommand(["SUNSUBSCRIBE"]), Route.Random);
153+
await _client.Command(Request.CustomCommand(["SUNSUBSCRIBE_BLOCKING", GetTimeoutMs().ToString()]), Route.Random);
150154
}
151155
}
152156

@@ -198,19 +202,22 @@ private async Task<long> SendPublishCommand(ValkeyChannel channel, ValkeyValue m
198202
private async Task SendSubscribeCommand(ValkeyChannel channel)
199203
{
200204
var channelStr = channel.ToString();
205+
var timeout = GetTimeout();
201206

202207
if (channel.IsSharded)
203208
{
204209
ThrowIfNotClusterMode();
205-
await _client.Command(Request.CustomCommand(["SSUBSCRIBE", channelStr]), Route.Random);
210+
211+
// TODO #205: Refactor to use GlideClusterClient instead of custom command.
212+
await _client.Command(Request.CustomCommand(["SSUBSCRIBE_BLOCKING", channelStr, GetTimeoutMs().ToString()]), Route.Random);
206213
}
207214
else if (channel.IsPattern)
208215
{
209-
await _client.PSubscribeLazyAsync(channelStr);
216+
await _client.PSubscribeAsync(channelStr, timeout);
210217
}
211218
else
212219
{
213-
await _client.SubscribeLazyAsync(channelStr);
220+
await _client.SubscribeAsync(channelStr, timeout);
214221
}
215222
}
216223

@@ -221,24 +228,35 @@ private async Task SendSubscribeCommand(ValkeyChannel channel)
221228
private async Task SendUnsubscribeCommand(ValkeyChannel channel)
222229
{
223230
var channelStr = channel.ToString();
231+
var timeout = GetTimeout();
224232

225233
if (channel.IsSharded)
226234
{
227235
ThrowIfNotClusterMode();
228236

229237
// TODO #205: Refactor to use GlideClusterClient instead of custom command.
230-
await _client.Command(Request.CustomCommand(["SUNSUBSCRIBE", channelStr]), Route.Random);
238+
await _client.Command(Request.CustomCommand(["SUNSUBSCRIBE_BLOCKING", channelStr, GetTimeoutMs().ToString()]), Route.Random);
231239
}
232240
else if (channel.IsPattern)
233241
{
234-
await _client.PUnsubscribeLazyAsync(channelStr);
242+
await _client.PUnsubscribeAsync(channelStr, timeout);
235243
}
236244
else
237245
{
238-
await _client.UnsubscribeLazyAsync(channelStr);
246+
await _client.UnsubscribeAsync(channelStr, timeout);
239247
}
240248
}
241249

250+
/// <summary>
251+
/// Returns the timeout for subscribe/unsubscribe operations.
252+
/// </summary>
253+
private TimeSpan GetTimeout() => TimeSpan.FromMilliseconds(GetTimeoutMs());
254+
255+
/// <summary>
256+
/// Returns the timeout in milliseconds for subscribe/unsubscribe operations.
257+
/// </summary>
258+
private int GetTimeoutMs() => _multiplexer.RawConfig.AsyncTimeout ?? DefaultTimeoutMs;
259+
242260
/// <summary>
243261
/// Throws if the client is not in cluster mode.
244262
/// </summary>

sources/Valkey.Glide/BaseClient.PubSubCommands.cs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,89 @@ public async Task<long> PublishAsync(string channel, string message)
2323
#endregion
2424
#region SubscribeCommands
2525

26+
public async Task SubscribeAsync(string channel, TimeSpan timeout = default)
27+
{
28+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
29+
await Command(Request.SubscribeBlocking([channel], (uint)timeout.TotalMilliseconds));
30+
}
31+
32+
public async Task SubscribeAsync(IEnumerable<string> channels, TimeSpan timeout = default)
33+
{
34+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
35+
await Command(Request.SubscribeBlocking(channels.ToGlideStrings(), (uint)timeout.TotalMilliseconds));
36+
}
37+
2638
public async Task SubscribeLazyAsync(string channel)
2739
=> await Command(Request.Subscribe([channel]));
2840

2941
public async Task SubscribeLazyAsync(IEnumerable<string> channels)
30-
=> await Command(Request.Subscribe(channels.ToHashSet().ToGlideStrings()));
42+
=> await Command(Request.Subscribe(channels.ToGlideStrings()));
43+
44+
public async Task PSubscribeAsync(string pattern, TimeSpan timeout = default)
45+
{
46+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
47+
await Command(Request.PSubscribeBlocking([pattern], (uint)timeout.TotalMilliseconds));
48+
}
49+
50+
public async Task PSubscribeAsync(IEnumerable<string> patterns, TimeSpan timeout = default)
51+
{
52+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
53+
await Command(Request.PSubscribeBlocking(patterns.ToGlideStrings(), (uint)timeout.TotalMilliseconds));
54+
}
3155

3256
public async Task PSubscribeLazyAsync(string pattern)
3357
=> await Command(Request.PSubscribe([pattern]));
3458

3559
public async Task PSubscribeLazyAsync(IEnumerable<string> patterns)
36-
=> await Command(Request.PSubscribe(patterns.ToHashSet().ToGlideStrings()));
60+
=> await Command(Request.PSubscribe(patterns.ToGlideStrings()));
3761

3862
#endregion
3963
#region UnsubscribeCommands
4064

65+
public async Task UnsubscribeAsync(TimeSpan timeout = default)
66+
{
67+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
68+
await Command(Request.UnsubscribeBlocking([], (uint)timeout.TotalMilliseconds));
69+
}
70+
71+
public async Task UnsubscribeAsync(string channel, TimeSpan timeout = default)
72+
{
73+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
74+
await Command(Request.UnsubscribeBlocking([channel], (uint)timeout.TotalMilliseconds));
75+
}
76+
77+
public async Task UnsubscribeAsync(IEnumerable<string> channels, TimeSpan timeout = default)
78+
{
79+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
80+
await Command(Request.UnsubscribeBlocking(channels.ToGlideStrings(), (uint)timeout.TotalMilliseconds));
81+
}
82+
4183
public async Task UnsubscribeLazyAsync()
4284
=> await Command(Request.Unsubscribe([]));
4385

4486
public async Task UnsubscribeLazyAsync(string channel)
4587
=> await Command(Request.Unsubscribe([channel]));
4688

4789
public async Task UnsubscribeLazyAsync(IEnumerable<string> channels)
48-
=> await Command(Request.Unsubscribe(channels.ToHashSet().ToGlideStrings()));
90+
=> await Command(Request.Unsubscribe(channels.ToGlideStrings()));
91+
92+
public async Task PUnsubscribeAsync(TimeSpan timeout = default)
93+
{
94+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
95+
await Command(Request.PUnsubscribeBlocking([], (uint)timeout.TotalMilliseconds));
96+
}
97+
98+
public async Task PUnsubscribeAsync(string pattern, TimeSpan timeout = default)
99+
{
100+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
101+
await Command(Request.PUnsubscribeBlocking([pattern], (uint)timeout.TotalMilliseconds));
102+
}
103+
104+
public async Task PUnsubscribeAsync(IEnumerable<string> patterns, TimeSpan timeout = default)
105+
{
106+
GuardClauses.ThrowIfTimeSpanNegative(timeout);
107+
await Command(Request.PUnsubscribeBlocking(patterns.ToGlideStrings(), (uint)timeout.TotalMilliseconds));
108+
}
49109

50110
public async Task PUnsubscribeLazyAsync()
51111
=> await Command(Request.PUnsubscribe([]));
@@ -54,7 +114,7 @@ public async Task PUnsubscribeLazyAsync(string pattern)
54114
=> await Command(Request.PUnsubscribe([pattern]));
55115

56116
public async Task PUnsubscribeLazyAsync(IEnumerable<string> patterns)
57-
=> await Command(Request.PUnsubscribe(patterns.ToHashSet().ToGlideStrings()));
117+
=> await Command(Request.PUnsubscribe(patterns.ToGlideStrings()));
58118

59119
#endregion
60120
#region IntrospectionCommands
@@ -66,7 +126,7 @@ public async Task<ISet<string>> PubSubChannelsAsync(string pattern)
66126
=> await Command(Request.PubSubChannels(pattern));
67127

68128
public async Task<Dictionary<string, long>> PubSubNumSubAsync(IEnumerable<string> channels)
69-
=> await Command(Request.PubSubNumSub(channels.ToHashSet().ToGlideStrings()));
129+
=> await Command(Request.PubSubNumSub(channels.ToGlideStrings()));
70130

71131
public async Task<long> PubSubNumPatAsync()
72132
=> await Command(Request.PubSubNumPat());

sources/Valkey.Glide/BaseClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ private static PubSubMessage MarshalPubSubMessage(
398398
else if (pushKind == PushKind.PushPMessage)
399399
return PubSubMessage.FromPattern(message, channel, pattern!);
400400
else if (pushKind == PushKind.PushSMessage)
401-
return PubSubMessage.FromShardChannel(message, channel);
401+
return PubSubMessage.FromShardedChannel(message, channel);
402402
else
403403
throw new ArgumentOutOfRangeException(nameof(pushKind), $"Unsupported PushKind: {pushKind}");
404404
}

0 commit comments

Comments
 (0)