Skip to content

Commit f043e4d

Browse files
badrishcCopilot
andcommitted
Fix pub/sub: restrict commands in RESP2 subscription mode and fix reentrant lock crash
Fixes #1615. Two related bugs: 1. RESP2 subscription mode allowed arbitrary commands (GET, SET, PUBLISH, etc.) instead of restricting to only (P|S)SUBSCRIBE/(P|S)UNSUBSCRIBE/PING/QUIT per the Redis protocol. Added IsAllowedInSubscriptionMode() check in ProcessMessages that rejects disallowed commands with a Redis-compatible error message. 2. PUBLISH from a subscriber session caused SynchronizationLockException because the Publish() callback re-entered the spinlock already held by TryConsumeMessages. Fixed by tracking the command-processing thread ID and detecting reentrant calls in Publish()/PatternPublish() to skip lock acquire/release when on the same thread. RESP3 sessions are not restricted since push message types are distinguishable from regular responses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e8760eb commit f043e4d

5 files changed

Lines changed: 288 additions & 10 deletions

File tree

libs/server/Resp/CmdStrings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ static partial class CmdStrings
328328
public const string GenericUnknownClientType = "ERR Unknown client type '{0}'";
329329
public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times";
330330
public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option.";
331+
public const string GenericPubSubCommandNotAllowed = "ERR Can't execute '{0}': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context";
331332
public const string GenericErrLonLat = "ERR invalid longitude,latitude pair {0:F6},{1:F6}";
332333
public const string GenericErrStoreCommand = "ERR STORE option in {0} is not compatible with WITHDIST, WITHHASH and WITHCOORD options";
333334
public const string GenericErrCommandDisallowedWithOption =

libs/server/Resp/Parser/RespCommand.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,23 @@ public static bool IsClusterSubCommand(this RespCommand cmd)
627627
bool inRange = test <= (RespCommand.CLUSTER_SYNC - RespCommand.CLUSTER_ADDSLOTS);
628628
return inRange;
629629
}
630+
631+
/// <summary>
632+
/// Returns true if <paramref name="cmd"/> is allowed while a session is in
633+
/// pub/sub subscription mode (RESP2). Per the Redis protocol, only
634+
/// (P|S)SUBSCRIBE, (P|S)UNSUBSCRIBE, PING, and QUIT are valid in this state.
635+
/// </summary>
636+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
637+
public static bool IsAllowedInSubscriptionMode(this RespCommand cmd)
638+
{
639+
return cmd is RespCommand.SUBSCRIBE
640+
or RespCommand.UNSUBSCRIBE
641+
or RespCommand.PSUBSCRIBE
642+
or RespCommand.PUNSUBSCRIBE
643+
or RespCommand.SSUBSCRIBE
644+
or RespCommand.PING
645+
or RespCommand.QUIT;
646+
}
630647
}
631648

632649
/// <summary>

libs/server/Resp/PubSubCommands.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.Diagnostics;
76
using Garnet.common;
87
using Tsavorite.core;
98

@@ -21,9 +20,15 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
2120
/// <inheritdoc />
2221
public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
2322
{
23+
// When a session publishes to a channel it is itself subscribed to, the Broadcast
24+
// callback is invoked on the same thread that already holds the network sender lock
25+
// via TryConsumeMessages. Detect this reentrant case and write directly to the
26+
// existing output buffer instead of re-entering the lock.
27+
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
2428
try
2529
{
26-
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
30+
if (!reentrant)
31+
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
2732

2833
WritePushLength(3);
2934

@@ -35,7 +40,7 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
3540
WriteDirectLargeRespString(value.ReadOnlySpan);
3641

3742
// Flush the publish message for this subscriber
38-
if (dcurr > networkSender.GetResponseObjectHead())
43+
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
3944
Send(networkSender.GetResponseObjectHead());
4045
}
4146
catch
@@ -44,16 +49,19 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
4449
}
4550
finally
4651
{
47-
networkSender.ExitAndReturnResponseObject();
52+
if (!reentrant)
53+
networkSender.ExitAndReturnResponseObject();
4854
}
4955
}
5056

5157
/// <inheritdoc />
5258
public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByte key, PinnedSpanByte value)
5359
{
60+
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
5461
try
5562
{
56-
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
63+
if (!reentrant)
64+
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
5765

5866
WritePushLength(4);
5967

@@ -65,7 +73,7 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
6573
WriteDirectLargeRespString(key.ReadOnlySpan);
6674
WriteDirectLargeRespString(value.ReadOnlySpan);
6775

68-
if (dcurr > networkSender.GetResponseObjectHead())
76+
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
6977
Send(networkSender.GetResponseObjectHead());
7078
}
7179
catch
@@ -74,7 +82,8 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
7482
}
7583
finally
7684
{
77-
networkSender.ExitAndReturnResponseObject();
85+
if (!reentrant)
86+
networkSender.ExitAndReturnResponseObject();
7887
}
7988
}
8089

@@ -100,7 +109,6 @@ private bool NetworkPUBLISH(RespCommand cmd)
100109
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED);
101110
}
102111

103-
Debug.Assert(isSubscriptionSession == false);
104112
// PUBLISH channel message => [*3\r\n$7\r\nPUBLISH\r\n$]7\r\nchannel\r\n$7\r\message\r\n
105113

106114
var key = parseState.GetArgSliceByRef(0);

libs/server/Resp/RespServerSession.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
8080

8181
int opCount;
8282

83+
// Thread ID of the thread currently processing commands (used by Publish/PatternPublish
84+
// callbacks to detect reentrant calls when the same session publishes to a self-subscribed channel)
85+
int commandProcessingThreadId;
86+
8387
/// <summary>
8488
/// Current database session items
8589
/// </summary>
@@ -441,8 +445,8 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
441445
clusterSession?.AcquireCurrentEpoch();
442446
recvBufferPtr = reqBuffer;
443447
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
448+
commandProcessingThreadId = Environment.CurrentManagedThreadId;
444449
ProcessMessages();
445-
recvBufferPtr = null;
446450
}
447451
catch (RespParsingException ex)
448452
{
@@ -497,6 +501,7 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
497501
}
498502
finally
499503
{
504+
commandProcessingThreadId = 0;
500505
networkSender.ExitAndReturnResponseObject();
501506
clusterSession?.ReleaseCurrentEpoch();
502507
scratchBufferBuilder.Reset();
@@ -575,7 +580,14 @@ private void ProcessMessages()
575580

576581
if (CheckACLPermissions(cmd) && (noScriptPassed = CheckScriptPermissions(cmd)))
577582
{
578-
if (txnManager.state != TxnState.None)
583+
// In RESP2, only a small set of commands are allowed while in subscription mode.
584+
// RESP3 uses distinct push types for subscription messages, so all commands are valid.
585+
if (isSubscriptionSession && respProtocolVersion == 2 && !cmd.IsAllowedInSubscriptionMode())
586+
{
587+
while (!RespWriteUtils.TryWriteError(string.Format(CmdStrings.GenericPubSubCommandNotAllowed, cmd.ToString()), ref dcurr, dend))
588+
SendAndReset();
589+
}
590+
else if (txnManager.state != TxnState.None)
579591
{
580592
if (txnManager.state == TxnState.Running)
581593
{

0 commit comments

Comments
 (0)