Skip to content

Commit 33ab11e

Browse files
authored
Update RESP3 output (#1170)
* Pass respProtocolVersion in GarnetObjectStoreOutput * Use resp output struct to simplify code and emit correct RESP3 null. * Make more places able to pass the Resp3 flag. * SortedSet example. TODO: convert to use input flags. * Convert to use RespInputFlag. * Convert Set and (nost of) List operations to output struct. Add resp3 flags to Scan operations. * Update comment. Emit the right null in more places. * Set and pass respProtocolVersion more consistently. * Allow passing respProtocolVersion to storageSession and use it in cases where the input isn't passed to the API. * In practice, the protocol version doesn't matter for string store - it doesn't have Operate(). We can use that to restore SetGet flag to old value and keep AOF format. * dotnet format * By using separate flags for RESP3 object and RESP3 string, we can keep object invariant while still marking RESP3 for RawStringInput. * More cases where Garnet should emit RESP3 NULL. * Info returns RESP_EMPTY, not nill on redis on equivalent case. Async GET should be able to return RESP3 null too. * Add RESP3 null checks into MainStore too. * Add test * These sortedset functions should emit double under RESP3 for score * Update LCS command output. * Use functionstate instead of input flags. * Use the functionstate constructor. * Update more commands to use RESP3 map. * More updates for RESP3 output. More tests. * Extract out RespMemoryWriter. This functionality is useful beyond the server. * Empty commit to try running the actions again. * Convert ListPosition. Fix return value when NOTFOUND and COUNT is used. Add test. Move all use of Reallocate to RespMemoryWriter Remove optimal line count ;-( * Fix test. * Fmt * Rename parameters. * Add methods to RespMemoryWriter to allow use in ToRespFormat(). * Convert COMMAND to RespMemoryWriter output, add RESP3 output. * This is simpler * Simplify by having the caller allocate * fmt * Also convert keyspecs. * Minor optimizations. * Move header to GarnetObjectStoreOutput and start untangling code. * Convert rest of object functions to use header and result1 directly. Then remove GarnetObjectStoreRespOutput and convert all uses to RespMemoryWriter. * More succinct code around functionsState. * Fmt * Remove unnecessary unsafes. Generalizes ObjectUtils.Scan() operation to allow this. * Pass extra parameter for HSCAN. * Avoid stackalloc and just write directly. * No need for these public members in RespMemoryWriter Convert GEOSEARCHSTORE implementation to use RespMemoryWriter. (All sites calling TryReadErrorAsString already checked for '-' character, it's fine to return false there) * No need for ObjectUtils, we can move remaining code into GarnetObjectBase. Allow long cursor for object SCAN commands. * Adjust custom path to supply cursor via parseState and not arg1, so we can have a 64bit argument there. * Ignore case in some *SCAN option comparisons. * Scoping the argument lets us undo the change the respreadutils. * Undo the comment change too. * A few more RESP3 updates * Minor push changes. * These null arrays should emit RESP3 nulls. Improve RespMemoryWriter. * This arrangement is better due to padding. * Use consistent naming (writer for RespMemoryWriter, output for GarnetObjectStoreOutput). * Switch to using instead of try/finally. No functional change. * Add constructors to simplify GarnetObjectStoreOutput initialization. * Revert "Add constructors to simplify GarnetObjectStoreOutput initialization." This reverts commit f60d08d. * Add constructors to simplify GarnetObjectStoreOutput initialization. * Try to make a little faster by skipping locals init. * Revert "Try to make a little faster by skipping locals init." This reverts commit 8139e67. * Try to make a little faster by skipping locals init. * No need for static reallocate anymore. * Address review comment
1 parent e117649 commit 33ab11e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+3306
-3769
lines changed

libs/client/ClientSession/GarnetClientSession.cs

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public sealed partial class GarnetClientSession : IServerHook, IMessageConsumer
6363
/// </summary>
6464
public bool IsConnected => socket != null && socket.Connected && !Disposed;
6565

66+
/// <summary>
67+
/// Get raw results from tcs completion
68+
/// </summary>
69+
public bool RawResult = false;
70+
6671
/// <summary>
6772
/// Username to authenticate the session on the server.
6873
/// </summary>
@@ -107,6 +112,7 @@ public GarnetClientSession(
107112
string authUsername = null,
108113
string authPassword = null,
109114
int networkSendThrottleMax = 8,
115+
bool rawResult = false,
110116
ILogger logger = null)
111117
{
112118
EndPoint = endpoint;
@@ -122,6 +128,7 @@ public GarnetClientSession(
122128
this.disposed = 0;
123129
this.authUsername = authUsername;
124130
this.authPassword = authPassword;
131+
this.RawResult = rawResult;
125132
}
126133

127134
/// <summary>
@@ -459,36 +466,51 @@ private unsafe int ProcessReplies(byte* recvBufferPtr, int bytesRead)
459466

460467
while (readHead < bytesRead)
461468
{
462-
switch (*ptr)
469+
if (RawResult)
463470
{
464-
case (byte)'+':
465-
if (!RespReadResponseUtils.TryReadSimpleString(out result, ref ptr, recvBufferPtr + bytesRead))
466-
success = false;
467-
break;
468-
case (byte)':':
469-
if (!RespReadResponseUtils.TryReadIntegerAsString(out result, ref ptr, recvBufferPtr + bytesRead))
470-
success = false;
471-
break;
472-
473-
case (byte)'-':
474-
error = true;
475-
if (!RespReadResponseUtils.TryReadErrorAsString(out result, ref ptr, recvBufferPtr + bytesRead))
476-
success = false;
477-
break;
478-
479-
case (byte)'$':
480-
if (!RespReadResponseUtils.TryReadStringWithLengthHeader(out result, ref ptr, recvBufferPtr + bytesRead))
481-
success = false;
482-
break;
483-
484-
case (byte)'*':
485-
isArray = true;
486-
if (!RespReadResponseUtils.TryReadStringArrayWithLengthHeader(out resultArray, ref ptr, recvBufferPtr + bytesRead))
487-
success = false;
488-
break;
489-
490-
default:
491-
throw new Exception("Unexpected response: " + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", "") + "]");
471+
if (RespReadUtils.TryReadString(out var tmp, ref ptr, recvBufferPtr + bytesRead))
472+
{
473+
result += tmp;
474+
result += "\r\n";
475+
}
476+
else
477+
{
478+
success = false;
479+
}
480+
}
481+
else
482+
{
483+
switch (*ptr)
484+
{
485+
case (byte)'+':
486+
if (!RespReadResponseUtils.TryReadSimpleString(out result, ref ptr, recvBufferPtr + bytesRead))
487+
success = false;
488+
break;
489+
case (byte)':':
490+
if (!RespReadResponseUtils.TryReadIntegerAsString(out result, ref ptr, recvBufferPtr + bytesRead))
491+
success = false;
492+
break;
493+
494+
case (byte)'-':
495+
error = true;
496+
if (!RespReadResponseUtils.TryReadErrorAsString(out result, ref ptr, recvBufferPtr + bytesRead))
497+
success = false;
498+
break;
499+
500+
case (byte)'$':
501+
if (!RespReadResponseUtils.TryReadStringWithLengthHeader(out result, ref ptr, recvBufferPtr + bytesRead))
502+
success = false;
503+
break;
504+
505+
case (byte)'*':
506+
isArray = true;
507+
if (!RespReadResponseUtils.TryReadStringArrayWithLengthHeader(out resultArray, ref ptr, recvBufferPtr + bytesRead))
508+
success = false;
509+
break;
510+
511+
default:
512+
throw new Exception("Unexpected response: " + Encoding.UTF8.GetString(new Span<byte>(recvBufferPtr, bytesRead)).Replace("\n", "|").Replace("\r", "") + "]");
513+
}
492514
}
493515

494516
if (!success) return readHead;
@@ -500,13 +522,21 @@ private unsafe int ProcessReplies(byte* recvBufferPtr, int bytesRead)
500522
var tcs = tcsArrayQueue.Dequeue();
501523
tcs?.SetResult(resultArray);
502524
}
503-
else
525+
else if (!RawResult)
504526
{
505527
var tcs = tcsQueue.Dequeue();
506528
if (error) tcs?.SetException(new Exception(result));
507529
else tcs?.SetResult(result);
508530
}
509531
}
532+
533+
if (RawResult)
534+
{
535+
var tcs = tcsQueue.Dequeue();
536+
if (error) tcs?.SetException(new Exception(result));
537+
else tcs?.SetResult(result);
538+
}
539+
510540
return readHead;
511541
}
512542

0 commit comments

Comments
 (0)