3
3
4
4
using System ;
5
5
using System . Buffers ;
6
- using System . Buffers . Binary ;
7
6
using System . Diagnostics ;
8
7
using System . Runtime . CompilerServices ;
9
8
using System . Runtime . InteropServices ;
@@ -115,8 +114,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
115
114
/// </summary>
116
115
public IGarnetServer Server { get ; set ; }
117
116
118
- // Track whether the incoming network batch had some admin command
119
- bool hasAdminCommand ;
117
+ // Track whether the incoming network batch contains slow commands that should not be counter in NET_RS histogram
118
+ bool containsSlowCommand ;
120
119
121
120
readonly CustomCommandManagerSession customCommandManagerSession ;
122
121
@@ -369,10 +368,10 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
369
368
{
370
369
if ( latencyMetrics != null )
371
370
{
372
- if ( hasAdminCommand )
371
+ if ( containsSlowCommand )
373
372
{
374
373
latencyMetrics . StopAndSwitch ( LatencyMetricsType . NET_RS_LAT , LatencyMetricsType . NET_RS_LAT_ADMIN ) ;
375
- hasAdminCommand = false ;
374
+ containsSlowCommand = false ;
376
375
}
377
376
else
378
377
latencyMetrics . Stop ( LatencyMetricsType . NET_RS_LAT ) ;
@@ -443,6 +442,10 @@ private void ProcessMessages()
443
442
SendAndReset ( ) ;
444
443
}
445
444
}
445
+ else
446
+ {
447
+ containsSlowCommand = true ;
448
+ }
446
449
447
450
// Advance read head variables to process the next command
448
451
_origReadHead = readHead = endReadHead ;
@@ -496,6 +499,10 @@ private bool MakeUpperCase(byte* ptr)
496
499
private bool ProcessBasicCommands < TGarnetApi > ( RespCommand cmd , ref TGarnetApi storageApi )
497
500
where TGarnetApi : IGarnetApi
498
501
{
502
+ /*
503
+ * WARNING: Do not add any command here classified as @slow!
504
+ * Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
505
+ */
499
506
_ = cmd switch
500
507
{
501
508
RespCommand . GET => NetworkGET ( ref storageApi ) ,
@@ -515,6 +522,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
515
522
RespCommand . SETRANGE => NetworkSetRange ( ref storageApi ) ,
516
523
RespCommand . GETDEL => NetworkGETDEL ( ref storageApi ) ,
517
524
RespCommand . APPEND => NetworkAppend ( ref storageApi ) ,
525
+ RespCommand . STRLEN => NetworkSTRLEN ( ref storageApi ) ,
518
526
RespCommand . INCR => NetworkIncrement ( RespCommand . INCR , ref storageApi ) ,
519
527
RespCommand . INCRBY => NetworkIncrement ( RespCommand . INCRBY , ref storageApi ) ,
520
528
RespCommand . DECR => NetworkIncrement ( RespCommand . DECR , ref storageApi ) ,
@@ -524,7 +532,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
524
532
RespCommand . BITCOUNT => NetworkStringBitCount ( ref storageApi ) ,
525
533
RespCommand . BITPOS => NetworkStringBitPosition ( ref storageApi ) ,
526
534
RespCommand . PUBLISH => NetworkPUBLISH ( ) ,
527
- RespCommand . PING => parseState . Count == 0 ? NetworkPING ( ) : ProcessArrayCommands ( cmd , ref storageApi ) ,
535
+ RespCommand . PING => parseState . Count == 0 ? NetworkPING ( ) : NetworkArrayPING ( ) ,
528
536
RespCommand . ASKING => NetworkASKING ( ) ,
529
537
RespCommand . MULTI => NetworkMULTI ( ) ,
530
538
RespCommand . EXEC => NetworkEXEC ( ) ,
@@ -534,22 +542,6 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
534
542
RespCommand . RUNTXP => NetworkRUNTXP ( ) ,
535
543
RespCommand . READONLY => NetworkREADONLY ( ) ,
536
544
RespCommand . READWRITE => NetworkREADWRITE ( ) ,
537
- RespCommand . COMMAND => NetworkCOMMAND ( ) ,
538
- RespCommand . COMMAND_COUNT => NetworkCOMMAND_COUNT ( ) ,
539
- RespCommand . COMMAND_INFO => NetworkCOMMAND_INFO ( ) ,
540
- RespCommand . ECHO => NetworkECHO ( ) ,
541
- RespCommand . INFO => NetworkINFO ( ) ,
542
- RespCommand . HELLO => NetworkHELLO ( ) ,
543
- RespCommand . TIME => NetworkTIME ( ) ,
544
- RespCommand . FLUSHALL => NetworkFLUSHALL ( ) ,
545
- RespCommand . FLUSHDB => NetworkFLUSHDB ( ) ,
546
- RespCommand . AUTH => NetworkAUTH ( ) ,
547
- RespCommand . MEMORY_USAGE => NetworkMemoryUsage ( ref storageApi ) ,
548
- RespCommand . ACL_CAT => NetworkAclCat ( ) ,
549
- RespCommand . ACL_WHOAMI => NetworkAclWhoAmI ( ) ,
550
- RespCommand . ASYNC => NetworkASYNC ( ) ,
551
- RespCommand . MIGRATE => NetworkProcessClusterCommand ( cmd ) ,
552
-
553
545
_ => ProcessArrayCommands ( cmd , ref storageApi )
554
546
} ;
555
547
@@ -559,6 +551,10 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
559
551
private bool ProcessArrayCommands < TGarnetApi > ( RespCommand cmd , ref TGarnetApi storageApi )
560
552
where TGarnetApi : IGarnetApi
561
553
{
554
+ /*
555
+ * WARNING: Do not add any command here classified as @slow!
556
+ * Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
557
+ */
562
558
var success = cmd switch
563
559
{
564
560
RespCommand . MGET => NetworkMGET ( ref storageApi ) ,
@@ -569,13 +565,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
569
565
RespCommand . WATCH => NetworkWATCH ( ) ,
570
566
RespCommand . WATCH_MS => NetworkWATCH_MS ( ) ,
571
567
RespCommand . WATCH_OS => NetworkWATCH_OS ( ) ,
572
- RespCommand . STRLEN => NetworkSTRLEN ( ref storageApi ) ,
573
- RespCommand . PING => NetworkArrayPING ( ) ,
574
- //General key commands
575
- RespCommand . DBSIZE => NetworkDBSIZE ( ref storageApi ) ,
576
- RespCommand . KEYS => NetworkKEYS ( ref storageApi ) ,
577
- RespCommand . SCAN => NetworkSCAN ( ref storageApi ) ,
578
- RespCommand . TYPE => NetworkTYPE ( ref storageApi ) ,
579
568
// Pub/sub commands
580
569
RespCommand . SUBSCRIBE => NetworkSUBSCRIBE ( ) ,
581
570
RespCommand . PSUBSCRIBE => NetworkPSUBSCRIBE ( ) ,
@@ -676,10 +665,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
676
665
RespCommand . SUNIONSTORE => SetUnionStore ( ref storageApi ) ,
677
666
RespCommand . SDIFF => SetDiff ( ref storageApi ) ,
678
667
RespCommand . SDIFFSTORE => SetDiffStore ( ref storageApi ) ,
679
- // Script Commands
680
- RespCommand . SCRIPT => TrySCRIPT ( ) ,
681
- RespCommand . EVAL => TryEVAL ( ) ,
682
- RespCommand . EVALSHA => TryEVALSHA ( ) ,
683
668
_ => ProcessOtherCommands ( cmd , ref storageApi )
684
669
} ;
685
670
return success ;
@@ -688,7 +673,48 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
688
673
private bool ProcessOtherCommands < TGarnetApi > ( RespCommand command , ref TGarnetApi storageApi )
689
674
where TGarnetApi : IGarnetApi
690
675
{
691
- if ( command == RespCommand . CLIENT_ID )
676
+ /*
677
+ * WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
678
+ */
679
+ containsSlowCommand = true ;
680
+ var success = command switch
681
+ {
682
+ RespCommand . AUTH => NetworkAUTH ( ) ,
683
+ RespCommand . MEMORY_USAGE => NetworkMemoryUsage ( ref storageApi ) ,
684
+ RespCommand . CLIENT_ID => NetworkCLIENTID ( ) ,
685
+ RespCommand . CLIENT_INFO => NetworkCLIENTINFO ( ) ,
686
+ RespCommand . CLIENT_LIST => NetworkCLIENTLIST ( ) ,
687
+ RespCommand . CLIENT_KILL => NetworkCLIENTKILL ( ) ,
688
+ RespCommand . COMMAND => NetworkCOMMAND ( ) ,
689
+ RespCommand . COMMAND_COUNT => NetworkCOMMAND_COUNT ( ) ,
690
+ RespCommand . COMMAND_INFO => NetworkCOMMAND_INFO ( ) ,
691
+ RespCommand . ECHO => NetworkECHO ( ) ,
692
+ RespCommand . HELLO => NetworkHELLO ( ) ,
693
+ RespCommand . TIME => NetworkTIME ( ) ,
694
+ RespCommand . FLUSHALL => NetworkFLUSHALL ( ) ,
695
+ RespCommand . FLUSHDB => NetworkFLUSHDB ( ) ,
696
+ RespCommand . ACL_CAT => NetworkAclCat ( ) ,
697
+ RespCommand . ACL_WHOAMI => NetworkAclWhoAmI ( ) ,
698
+ RespCommand . ASYNC => NetworkASYNC ( ) ,
699
+ RespCommand . RUNTXP => NetworkRUNTXP ( ) ,
700
+ RespCommand . INFO => NetworkINFO ( ) ,
701
+ RespCommand . CustomTxn => NetworkCustomTxn ( ) ,
702
+ RespCommand . CustomRawStringCmd => NetworkCustomRawStringCmd ( ref storageApi ) ,
703
+ RespCommand . CustomObjCmd => NetworkCustomObjCmd ( ref storageApi ) ,
704
+ RespCommand . CustomProcedure => NetworkCustomProcedure ( ) ,
705
+ //General key commands
706
+ RespCommand . DBSIZE => NetworkDBSIZE ( ref storageApi ) ,
707
+ RespCommand . KEYS => NetworkKEYS ( ref storageApi ) ,
708
+ RespCommand . SCAN => NetworkSCAN ( ref storageApi ) ,
709
+ RespCommand . TYPE => NetworkTYPE ( ref storageApi ) ,
710
+ // Script Commands
711
+ RespCommand . SCRIPT => TrySCRIPT ( ) ,
712
+ RespCommand . EVAL => TryEVAL ( ) ,
713
+ RespCommand . EVALSHA => TryEVALSHA ( ) ,
714
+ _ => Process ( command )
715
+ } ;
716
+
717
+ bool NetworkCLIENTID ( )
692
718
{
693
719
if ( parseState . Count != 0 )
694
720
{
@@ -700,28 +726,8 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
700
726
701
727
return true ;
702
728
}
703
- else if ( command == RespCommand . CLIENT_INFO )
704
- {
705
- return NetworkCLIENTINFO ( ) ;
706
- }
707
- else if ( command == RespCommand . CLIENT_LIST )
708
- {
709
- return NetworkCLIENTLIST ( ) ;
710
- }
711
- else if ( command == RespCommand . CLIENT_KILL )
712
- {
713
- return NetworkCLIENTKILL ( ) ;
714
- }
715
- else if ( command == RespCommand . SUBSCRIBE )
716
- {
717
- while ( ! RespWriteUtils . WriteInteger ( 1 , ref dcurr , dend ) )
718
- SendAndReset ( ) ;
719
- }
720
- else if ( command == RespCommand . RUNTXP )
721
- {
722
- return NetworkRUNTXP ( ) ;
723
- }
724
- else if ( command == RespCommand . CustomTxn )
729
+
730
+ bool NetworkCustomTxn ( )
725
731
{
726
732
if ( ! IsCommandArityValid ( currentCustomTransaction . NameStr , parseState . Count ) )
727
733
{
@@ -732,32 +738,10 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
732
738
// Perform the operation
733
739
TryTransactionProc ( currentCustomTransaction . id , recvBufferPtr + readHead , recvBufferPtr + endReadHead , customCommandManagerSession . GetCustomTransactionProcedure ( currentCustomTransaction . id , txnManager , scratchBufferManager ) . Item1 ) ;
734
740
currentCustomTransaction = null ;
741
+ return true ;
735
742
}
736
- else if ( command == RespCommand . CustomRawStringCmd )
737
- {
738
- if ( ! IsCommandArityValid ( currentCustomRawStringCommand . NameStr , parseState . Count ) )
739
- {
740
- currentCustomRawStringCommand = null ;
741
- return true ;
742
- }
743
-
744
- // Perform the operation
745
- TryCustomRawStringCommand ( recvBufferPtr + readHead , recvBufferPtr + endReadHead , currentCustomRawStringCommand . GetRespCommand ( ) , currentCustomRawStringCommand . expirationTicks , currentCustomRawStringCommand . type , ref storageApi ) ;
746
- currentCustomRawStringCommand = null ;
747
- }
748
- else if ( command == RespCommand . CustomObjCmd )
749
- {
750
- if ( ! IsCommandArityValid ( currentCustomObjectCommand . NameStr , parseState . Count ) )
751
- {
752
- currentCustomObjectCommand = null ;
753
- return true ;
754
- }
755
743
756
- // Perform the operation
757
- TryCustomObjectCommand ( recvBufferPtr + readHead , recvBufferPtr + endReadHead , currentCustomObjectCommand . GetRespCommand ( ) , currentCustomObjectCommand . subid , currentCustomObjectCommand . type , ref storageApi ) ;
758
- currentCustomObjectCommand = null ;
759
- }
760
- else if ( command == RespCommand . CustomProcedure )
744
+ bool NetworkCustomProcedure ( )
761
745
{
762
746
if ( ! IsCommandArityValid ( currentCustomProcedure . NameStr , parseState . Count ) )
763
747
{
@@ -769,12 +753,46 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
769
753
currentCustomProcedure . CustomProcedureImpl ) ;
770
754
771
755
currentCustomProcedure = null ;
756
+ return true ;
772
757
}
773
- else
758
+
759
+ [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
760
+ bool Process ( RespCommand command )
774
761
{
775
762
ProcessAdminCommands ( command ) ;
776
763
return true ;
777
764
}
765
+
766
+ return success ;
767
+ }
768
+
769
+ private bool NetworkCustomRawStringCmd < TGarnetApi > ( ref TGarnetApi storageApi )
770
+ where TGarnetApi : IGarnetApi
771
+ {
772
+ if ( ! IsCommandArityValid ( currentCustomRawStringCommand . NameStr , parseState . Count ) )
773
+ {
774
+ currentCustomRawStringCommand = null ;
775
+ return true ;
776
+ }
777
+
778
+ // Perform the operation
779
+ TryCustomRawStringCommand ( recvBufferPtr + readHead , recvBufferPtr + endReadHead , currentCustomRawStringCommand . GetRespCommand ( ) , currentCustomRawStringCommand . expirationTicks , currentCustomRawStringCommand . type , ref storageApi ) ;
780
+ currentCustomRawStringCommand = null ;
781
+ return true ;
782
+ }
783
+
784
+ bool NetworkCustomObjCmd < TGarnetApi > ( ref TGarnetApi storageApi )
785
+ where TGarnetApi : IGarnetApi
786
+ {
787
+ if ( ! IsCommandArityValid ( currentCustomObjectCommand . NameStr , parseState . Count ) )
788
+ {
789
+ currentCustomObjectCommand = null ;
790
+ return true ;
791
+ }
792
+
793
+ // Perform the operation
794
+ TryCustomObjectCommand ( recvBufferPtr + readHead , recvBufferPtr + endReadHead , currentCustomObjectCommand . GetRespCommand ( ) , currentCustomObjectCommand . subid , currentCustomObjectCommand . type , ref storageApi ) ;
795
+ currentCustomObjectCommand = null ;
778
796
return true ;
779
797
}
780
798
0 commit comments