Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,43 @@
]
}
]
},
{
"Command": "CLIENT_REPLY",
"Name": "CLIENT|REPLY",
"Summary": "Instructs the server whether to reply to commands.",
"Group": "Connection",
"Complexity": "O(1)",
"Arguments": [
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "ON-OFF-SKIP",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "ON",
"DisplayText": "on",
"Type": "PureToken",
"Token": "ON"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "OFF",
"DisplayText": "off",
"Type": "PureToken",
"Token": "OFF"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "SKIP",
"DisplayText": "skip",
"Type": "PureToken",
"Token": "SKIP"
}
]
}
]
}
]
},
Expand Down
7 changes: 7 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@
"Arity": -3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Connection, Dangerous, Slow"
},
{
"Command": "CLIENT_REPLY",
"Name": "CLIENT|REPLY",
"Arity": 3,
"Flags": "Loading, NoScript, Stale",
"AclCategories": "Connection, Slow"
}
]
},
Expand Down
57 changes: 57 additions & 0 deletions libs/server/Resp/ClientCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@

namespace Garnet.server
{
/// <summary>
/// Reply suppression mode controlled by the <c>CLIENT REPLY</c> subcommand.
/// </summary>
internal enum ClientReplyMode : byte
{
/// <summary>Normal replies are sent (default).</summary>
On = 0,
/// <summary>All replies are suppressed until a <c>CLIENT REPLY ON</c> is received.</summary>
Off,
/// <summary>The reply for the next command is suppressed; mode returns to <see cref="On"/> after.</summary>
Skip,
}

/// <summary>
/// Server session for RESP protocol - client commands are in this file
/// </summary>
Expand Down Expand Up @@ -640,5 +653,49 @@ private bool NetworkCLIENTUNBLOCK()

return true;
}

/// <summary>
/// CLIENT REPLY ON|OFF|SKIP — controls per-connection reply suppression.
/// OFF suppresses all replies until ON; SKIP suppresses only the next command's reply.
/// The OFF and SKIP commands themselves produce no reply; ON replies with +OK.
/// </summary>
private bool NetworkCLIENTREPLY()
{
if (parseState.Count != 1)
{
return AbortWithWrongNumberOfArguments("client|reply");
}

var modeSpan = parseState.GetArgSliceByRef(0).ReadOnlySpan;

if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ON))
{
clientReplyMode = ClientReplyMode.On;
// The ON command itself must reply +OK even if we just transitioned out of OFF/SKIP.
// Clear the suppression flag set at command-start so the +OK actually flushes.
suppressCurrentReply = false;
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.OFF))
{
clientReplyMode = ClientReplyMode.Off;
// No reply.
}
else if (modeSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.SKIP))
{
// SKIP only arms when we're currently On. If already Off it stays Off; if already Skip it stays Skip
// (a second SKIP just re-arms — it does not stack).
if (clientReplyMode == ClientReplyMode.On)
clientReplyMode = ClientReplyMode.Skip;
// No reply.
}
else
{
return AbortWithErrorMessage(CmdStrings.RESP_SYNTAX_ERROR);
}

return true;
}
}
}
2 changes: 2 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> GETNAME => "GETNAME"u8;
public static ReadOnlySpan<byte> SETINFO => "SETINFO"u8;
public static ReadOnlySpan<byte> UNBLOCK => "UNBLOCK"u8;
public static ReadOnlySpan<byte> REPLY => "REPLY"u8;
public static ReadOnlySpan<byte> SKIP => "SKIP"u8;
public static ReadOnlySpan<byte> USER => "USER"u8;
public static ReadOnlySpan<byte> ADDR => "ADDR"u8;
public static ReadOnlySpan<byte> LADDR => "LADDR"u8;
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public enum RespCommand : ushort
CLIENT_SETNAME,
CLIENT_SETINFO,
CLIENT_UNBLOCK,
CLIENT_REPLY,

MONITOR,
MODULE,
Expand Down Expand Up @@ -465,6 +466,7 @@ public static class RespCommandExtensions
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
RespCommand.CLIENT_REPLY,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down Expand Up @@ -2125,6 +2127,10 @@ private RespCommand SlowParseCommand(ReadOnlySpan<byte> command, ref int count,
{
return RespCommand.CLIENT_UNBLOCK;
}
else if (subCommand.SequenceEqual(CmdStrings.REPLY))
{
return RespCommand.CLIENT_REPLY;
}

string errMsg = string.Format(CmdStrings.GenericErrUnknownSubCommandNoHelp,
Encoding.UTF8.GetString(subCommand),
Expand Down
56 changes: 56 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ public IGarnetServer Server
/// </summary>
string clientLibVersion = null;

/// <summary>
/// Current CLIENT REPLY mode for this connection (controls reply suppression).
/// </summary>
internal ClientReplyMode clientReplyMode = ClientReplyMode.On;

/// <summary>
/// True while the current command's output should be discarded (i.e. CLIENT REPLY OFF/SKIP is active for this command).
/// Set at the start of each command in ProcessMessages and may be cleared by NetworkCLIENTREPLY when it processes ON
/// (so the +OK reply for `CLIENT REPLY ON` is allowed through).
/// </summary>
bool suppressCurrentReply = false;

/// <summary>
/// Flag indicating whether any of the commands in one message
/// requires us to block on AOF before sending response over the network
Expand Down Expand Up @@ -623,6 +635,18 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx

while (bytesRead - readHead >= 4)
{
// CLIENT REPLY: snapshot reply-suppression state for this command BEFORE parsing.
// - modeAtStart drives whether the buffer position we capture now (cmdStartPtr) gets
// rewound after the command runs (discarding the reply bytes).
// - suppressCurrentReply also gates SendAndReset() so any mid-command flush is
// discarded instead of being sent to the network. We set it before ParseCommand so
// parse-time error replies (e.g. "unknown command") are also gated.
// CLIENT REPLY ON clears suppressCurrentReply inside its handler so the +OK reply
// for the ON command itself is allowed through.
var modeAtStart = clientReplyMode;
var cmdStartPtr = dcurr;
suppressCurrentReply = modeAtStart != ClientReplyMode.On;

// First, parse the command, making sure we have the entire command available
// We use endReadHead to track the end of the current command
// On success, readHead is left at the start of the command payload for legacy operators
Expand All @@ -632,6 +656,7 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx
if (!commandReceived)
{
endReadHead = readHead = _origReadHead;
suppressCurrentReply = false;
break;
}

Expand Down Expand Up @@ -695,6 +720,27 @@ private void ProcessMessages<TBasicApi, TTxnApi>(ref TBasicApi basicApi, ref TTx
containsSlowCommand = true;
}

// CLIENT REPLY: end-of-command suppression handling. Applies to INVALID/parse-error
// commands as well so things like "GET" with no args don't leak "-ERR unknown command".
// suppressCurrentReply may have been cleared by NetworkCLIENTREPLY when it
// executed `CLIENT REPLY ON` (so the +OK reply is preserved).
if (suppressCurrentReply)
{
// Discard anything this command wrote (mid-command SendAndReset has already
// been gated, so the bytes still live within the current buffer between
// cmdStartPtr and dcurr).
dcurr = cmdStartPtr;
}
suppressCurrentReply = false;

// Burn off a one-shot SKIP. CLIENT REPLY commands themselves never burn the skip —
// a SKIP issued while already in Skip mode just re-arms (does not stack), and a
// following SKIP/ON/OFF transitions the mode directly inside the handler.
if (modeAtStart == ClientReplyMode.Skip && cmd != RespCommand.CLIENT_REPLY && cmd != RespCommand.INVALID)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — fixed in 3a034b8. Dropped the cmd != INVALID guard; SKIP now burns for any fully-received command except CLIENT_REPLY itself. Partial input is still gated by the !commandReceived early-break above. Added ClientReplySkipBurnedByUnknownCommand to lock in the behavior.

{
clientReplyMode = ClientReplyMode.On;
}

// Advance read head variables to process the next command
_origReadHead = readHead = endReadHead;

Expand Down Expand Up @@ -1050,6 +1096,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
RespCommand.CLIENT_SETNAME => NetworkCLIENTSETNAME(),
RespCommand.CLIENT_SETINFO => NetworkCLIENTSETINFO(),
RespCommand.CLIENT_UNBLOCK => NetworkCLIENTUNBLOCK(),
RespCommand.CLIENT_REPLY => NetworkCLIENTREPLY(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_DOCS => NetworkCOMMAND_DOCS(),
Expand Down Expand Up @@ -1318,6 +1365,15 @@ private unsafe bool Write(int seqNo, ref byte* dst, int length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void SendAndReset()
{
// CLIENT REPLY OFF/SKIP: discard whatever was written into the current buffer
// without sending it. We deliberately do NOT rotate to a fresh buffer or call Send —
// we just rewind dcurr to the head so the suppressed bytes are dropped.
if (suppressCurrentReply)
{
dcurr = networkSender.GetResponseObjectHead();
return;
}

byte* d = networkSender.GetResponseObjectHead();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 3a034b8. Promoted cmdStartPtr from a local to a per-session field cmdReplyFloor so SendAndReset() can use it as the rewind floor:

  • If the suppressed write made progress (dcurr > cmdReplyFloor): rewind to the floor — bytes in [head, cmdReplyFloor) from prior non-suppressed commands stay intact.
  • Else if prior replies are queued (cmdReplyFloor > head): Send([head, cmdReplyFloor)), rotate buffer, set cmdReplyFloor = new head, continue suppressing in the fresh buffer.
  • Else (no progress and nothing prior): throw — same fatal condition as the non-suppressed path. This also addresses the suppressed comment about infinite loops on oversized suppressed writes.

Also fixed SendAndReset(IMemoryOwner<byte>, int) which had the inverse problem — it would Send() the buffer contents mid-write, flushing this command's earlier bytes before the end-of-command rewind could discard them. Under suppression it now just disposes the payload; the prior bytes remain in the buffer for end-of-batch flush.

Added ClientReplyPipelinedSkipBetweenReplies and ClientReplyPipelinedOffOnOrdering to cover the pipeline ordering case.

if ((int)(dcurr - d) > 0)
{
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SupportedCommand
new("CLIENT|SETNAME", RespCommand.CLIENT_SETNAME),
new("CLIENT|SETINFO", RespCommand.CLIENT_SETINFO),
new("CLIENT|UNBLOCK", RespCommand.CLIENT_UNBLOCK),
new("CLIENT|REPLY", RespCommand.CLIENT_REPLY),
]),
new("CLUSTER", RespCommand.CLUSTER, StoreType.None,
[
Expand Down
Loading
Loading