Skip to content

Keyspace Notifications #1106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ internal sealed class Options
[Option("cluster-password", Required = false, HelpText = "Password to authenticate intra-cluster communication with.")]
public string ClusterPassword { get; set; }

[Option("notify-keyspace-events", Required = false, HelpText = "Keyspace Notification argument string.")]
public string NotifyKeyspaceEventsArguments { get; set; }

[FilePathValidation(true, true, false)]
[Option("acl-file", Required = false, HelpText = "External ACL user file.")]
public string AclFile { get; set; }
Expand Down
50 changes: 50 additions & 0 deletions libs/server/KeyspaceNotifications/KeyspaceNotificationPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using Garnet.server.KeyspaceNotifications;

namespace Garnet.server
{
internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
// TODO: use same parameter for key and keyevent; pass keyevent as ref; test performance; avoid concatSpans and remove concatenated Spans; check access modifiers
internal void PublishKeyspaceNotification(KeyspaceNotificationType keyspaceNotificationType, ref ArgSlice key, ReadOnlySpan<byte> keyevent)
{
if (!storeWrapper.serverOptions.AllowedKeyspaceNotifications.HasFlag(keyspaceNotificationType))
{
return;
}

var channel = ConcatSpans(KeyspaceNotificationStrings.KeyspacePrefix, KeyspaceNotificationStrings.Suffix);

if (storeWrapper.serverOptions.AllowedKeyspaceNotifications.HasFlag(KeyspaceNotificationType.Keyspace))
{
var keyspaceChannel = ConcatSpans(channel, key.ReadOnlySpan);
PublishKeyspace(ref keyspaceChannel, keyevent);
}

if (storeWrapper.serverOptions.AllowedKeyspaceNotifications.HasFlag(KeyspaceNotificationType.Keyevent))
{
var keyeventChannel = ConcatSpans(channel, keyevent);
PublishKeyevent(ref keyeventChannel, ref key);
}
}
private void PublishKeyspace(ref ReadOnlySpan<byte> channel, ReadOnlySpan<byte> keyevent)
{
// TODO: find a better solution for the string concatenation and converting to ArgSlice
subscribeBroker.Publish(ArgSlice.FromPinnedSpan(channel), ArgSlice.FromPinnedSpan(keyevent));
}

private void PublishKeyevent(ref ReadOnlySpan<byte> channel, ref ArgSlice key)
{
// TODO: see above
subscribeBroker.Publish(ArgSlice.FromPinnedSpan(channel), key);
}

private static ReadOnlySpan<byte> ConcatSpans(ReadOnlySpan<byte> first, ReadOnlySpan<byte> second)
{
byte[] combined = new byte[first.Length + second.Length];
first.CopyTo(combined);
second.CopyTo(combined.AsSpan(first.Length));
return combined;
}
}
}
15 changes: 15 additions & 0 deletions libs/server/KeyspaceNotifications/KeyspaceNotificationStrings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;

namespace Garnet.server.KeyspaceNotifications
{
/// <summary>
/// Keyspace Notification strings
/// </summary>
static partial class KeyspaceNotificationStrings
{
public static ReadOnlySpan<byte> KeyspacePrefix => "__keyspace@"u8;
public static ReadOnlySpan<byte> KeyeventPrefix => "__keyevent@"u8;
// TODO: dont use a hardcoded db id
public static ReadOnlySpan<byte> Suffix => "0__:"u8;
}
}
61 changes: 61 additions & 0 deletions libs/server/KeyspaceNotifications/KeyspaceNotificationType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;

namespace Garnet.server.KeyspaceNotifications
{
/// <summary>
/// Represents different types of notifications that can be received.
/// </summary>
[Flags]
public enum KeyspaceNotificationType
{
/// <summary>Keyspace events, published with __keyspace@&lt;db&gt;__ prefix.</summary>
Keyspace = 1 << 0, // K

/// <summary>Keyevent events, published with __keyevent@&lt;db&gt;__ prefix.</summary>
Keyevent = 1 << 1, // E

/// <summary>Generic commands (non-type specific) like DEL, EXPIRE, RENAME, etc.</summary>
Generic = 1 << 2, // g

/// <summary>String commands.</summary>
String = 1 << 3, // $

/// <summary>List commands.</summary>
List = 1 << 4, // l

/// <summary>Set commands.</summary>
Set = 1 << 5, // s

/// <summary>Hash commands.</summary>
Hash = 1 << 6, // h

/// <summary>Sorted set commands.</summary>
ZSet = 1 << 7, // z

/// <summary>Expired events (events generated every time a key expires).</summary>
Expired = 1 << 8, // x

/// <summary>Evicted events (events generated when a key is evicted for maxmemory).</summary>
Evicted = 1 << 9, // e

/// <summary>Stream commands.</summary>
Stream = 1 << 10, // t

/// <summary>Key miss events (events generated when a key that doesn't exist is accessed).</summary>
KeyMiss = 1 << 11, // m (Excluded from NOTIFY_ALL)

/// <summary>Module-only key space notification, indicating a key loaded from RDB.</summary>
Loaded = 1 << 12, // module only key space notification

/// <summary>Module key type events.</summary>
Module = 1 << 13, // d, module key space notification

/// <summary>New key events (Note: not included in the 'A' class).</summary>
New = 1 << 14, // n, new key notification

/// <summary>
/// Alias for "g$lshztxed", meaning all events except "m" (KeyMiss) and "n" (New).
/// </summary>
All = Generic | String | List | Set | Hash | ZSet | Expired | Evicted | Stream | Module // A flag
}
}
7 changes: 6 additions & 1 deletion libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Tsavorite.core;

namespace Garnet.server
Expand Down Expand Up @@ -167,6 +168,7 @@ private bool NetworkMSET<TGarnetApi>(ref TGarnetApi storageApi)
var key = parseState.GetArgSliceByRef(c).SpanByte;
var val = parseState.GetArgSliceByRef(c + 1).SpanByte;
_ = storageApi.SET(ref key, ref val);
PublishKeyspaceNotification(KeyspaceNotificationType.String, ref parseState.GetArgSliceByRef(c), CmdStrings.set);
}
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
Expand Down Expand Up @@ -196,15 +198,18 @@ private bool NetworkMSETNX<TGarnetApi>(ref TGarnetApi storageApi)
private bool NetworkDEL<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
int keysDeleted = 0;
int keysDeleted = 0;
for (int c = 0; c < parseState.Count; c++)
{
var key = parseState.GetArgSliceByRef(c).SpanByte;
var status = storageApi.DELETE(ref key, StoreType.All);

// This is only an approximate count because the deletion of a key on disk is performed as a blind tombstone append
if (status == GarnetStatus.OK)
{
keysDeleted++;
PublishKeyspaceNotification(KeyspaceNotificationType.Generic, ref parseState.GetArgSliceByRef(c), CmdStrings.del);
}
}

while (!RespWriteUtils.TryWriteInt32(keysDeleted, ref dcurr, dend))
Expand Down
7 changes: 6 additions & 1 deletion libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

Expand Down Expand Up @@ -290,6 +291,8 @@ private bool NetworkSET<TGarnetApi>(ref TGarnetApi storageApi)

storageApi.SET(ref key, ref value);

PublishKeyspaceNotification(KeyspaceNotificationType.Set, ref parseState.GetArgSliceByRef(0), CmdStrings.set);

while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();

Expand Down Expand Up @@ -338,7 +341,7 @@ private bool NetworkSetRange<TGarnetApi>(ref TGarnetApi storageApi)
var output = ArgSlice.FromPinnedSpan(outputBuffer);

storageApi.SETRANGE(key, ref input, ref output);

PublishKeyspaceNotification(KeyspaceNotificationType.String, ref key, CmdStrings.setrange);
while (!RespWriteUtils.TryWriteIntegerFromBytes(outputBuffer.Slice(0, output.Length), ref dcurr, dend))
SendAndReset();

Expand Down Expand Up @@ -763,6 +766,7 @@ private bool NetworkIncrement<TGarnetApi>(RespCommand cmd, ref TGarnetApi storag
case OperationError.SUCCESS:
while (!RespWriteUtils.TryWriteIntegerFromBytes(outputBuffer.Slice(0, output.Length), ref dcurr, dend))
SendAndReset();
PublishKeyspaceNotification(KeyspaceNotificationType.String, ref key, CmdStrings.incrby);
break;
case OperationError.INVALID_TYPE:
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
Expand Down Expand Up @@ -806,6 +810,7 @@ private bool NetworkIncrementByFloat<TGarnetApi>(ref TGarnetApi storageApi)
case OperationError.SUCCESS:
while (!RespWriteUtils.TryWriteBulkString(outputBuffer.Slice(0, output.Length), ref dcurr, dend))
SendAndReset();
PublishKeyspaceNotification(KeyspaceNotificationType.String, ref key, CmdStrings.incrbyfloat);
break;
case OperationError.INVALID_TYPE:
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_NOT_VALID_FLOAT, ref dcurr,
Expand Down
15 changes: 15 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> get => "get"u8;
public static ReadOnlySpan<byte> SET => "SET"u8;
public static ReadOnlySpan<byte> set => "set"u8;
public static ReadOnlySpan<byte> del => "del"u8;
public static ReadOnlySpan<byte> REWRITE => "REWRITE"u8;
public static ReadOnlySpan<byte> rewrite => "rewrite"u8;
public static ReadOnlySpan<byte> CONFIG => "CONFIG"u8;
public static ReadOnlySpan<byte> CertFileName => "cert-file-name"u8;
public static ReadOnlySpan<byte> CertPassword => "cert-password"u8;
public static ReadOnlySpan<byte> ClusterUsername => "cluster-username"u8;
public static ReadOnlySpan<byte> ClusterPassword => "cluster-password"u8;
public static ReadOnlySpan<byte> NotifiyKeyspaceEvents => "notify-keyspace-events"u8;
public static ReadOnlySpan<byte> ECHO => "ECHO"u8;
public static ReadOnlySpan<byte> ACL => "ACL"u8;
public static ReadOnlySpan<byte> AUTH => "AUTH"u8;
Expand Down Expand Up @@ -149,6 +151,19 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> TIMEOUT => "TIMEOUT"u8;
public static ReadOnlySpan<byte> ERROR => "ERROR"u8;
public static ReadOnlySpan<byte> INCRBY => "INCRBY"u8;
// TODO: move to another file or region for example KeyspaceNotificationEventStrings
public static ReadOnlySpan<byte> incrby => "incrby"u8;
public static ReadOnlySpan<byte> hincrby => "hincrby"u8;
public static ReadOnlySpan<byte> hincrbyfloat => "hincrbyfloat"u8;
public static ReadOnlySpan<byte> incrbyfloat => "incrbyfloat"u8;
public static ReadOnlySpan<byte> hpersist => "hpersist"u8;
public static ReadOnlySpan<byte> linsert => "linsert"u8;
public static ReadOnlySpan<byte> lset => "lset"u8;
public static ReadOnlySpan<byte> ltrim => "ltrim"u8;
public static ReadOnlySpan<byte> setrange => "setrange"u8;
public static ReadOnlySpan<byte> srem => "srem"u8;
public static ReadOnlySpan<byte> ssadd => "ssadd"u8;
// TODO: end
public static ReadOnlySpan<byte> NOGET => "NOGET"u8;

/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions libs/server/Resp/Objects/HashCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Text;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Tsavorite.core;

namespace Garnet.server
Expand Down Expand Up @@ -566,6 +567,17 @@ private unsafe bool HashIncrement<TGarnetApi>(RespCommand command, ref TGarnetAp
break;
default:
ProcessOutputWithHeader(outputFooter.SpanByteAndMemory);
switch (op)
{
case HashOperation.HINCRBY:
PublishKeyspaceNotification(KeyspaceNotificationType.Hash, ref parseState.GetArgSliceByRef(0), CmdStrings.hincrby);
break;
case HashOperation.HINCRBYFLOAT:
PublishKeyspaceNotification(KeyspaceNotificationType.Hash, ref parseState.GetArgSliceByRef(0), CmdStrings.hincrbyfloat);
break;
default:
break;
}
break;
}
return true;
Expand Down Expand Up @@ -820,6 +832,7 @@ private unsafe bool HashPersist<TGarnetApi>(ref TGarnetApi storageApi)
}
break;
default:
PublishKeyspaceNotification(KeyspaceNotificationType.Hash, ref parseState.GetArgSliceByRef(0), CmdStrings.hpersist);
ProcessOutputWithHeader(outputFooter.SpanByteAndMemory);
break;
}
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Text;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Tsavorite.core;

namespace Garnet.server
Expand Down Expand Up @@ -504,6 +505,7 @@ private bool ListTrim<TGarnetApi>(ref TGarnetApi storageApi)
SendAndReset();
break;
default:
PublishKeyspaceNotification(KeyspaceNotificationType.List, ref parseState.GetArgSliceByRef(0), CmdStrings.ltrim);
//GarnetStatus.OK or NOTFOUND have same result
// no need to process output, just send OK
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
Expand Down Expand Up @@ -667,6 +669,7 @@ private bool ListInsert<TGarnetApi>(ref TGarnetApi storageApi)
//process output
while (!RespWriteUtils.TryWriteInt32(output.result1, ref dcurr, dend))
SendAndReset();
PublishKeyspaceNotification(KeyspaceNotificationType.List, ref parseState.GetArgSliceByRef(0), CmdStrings.linsert);
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_RETURN_VAL_0, ref dcurr, dend))
Expand Down Expand Up @@ -895,6 +898,7 @@ public bool ListSet<TGarnetApi>(ref TGarnetApi storageApi)
case GarnetStatus.OK:
//process output
ProcessOutputWithHeader(outputFooter.SpanByteAndMemory);
PublishKeyspaceNotification(KeyspaceNotificationType.List, ref parseState.GetArgSliceByRef(0), CmdStrings.lset);
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_NOSUCHKEY, ref dcurr, dend))
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Text;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Tsavorite.core;

namespace Garnet.server
Expand Down Expand Up @@ -606,6 +607,9 @@ private unsafe bool SetMove<TGarnetApi>(ref TGarnetApi storageApi)
switch (status)
{
case GarnetStatus.OK:
// TODO: check usage of output var to send correct keyspace notification
PublishKeyspaceNotification(KeyspaceNotificationType.Set, ref sourceKey, CmdStrings.srem);
PublishKeyspaceNotification(KeyspaceNotificationType.Set, ref destinationKey, CmdStrings.ssadd);
while (!RespWriteUtils.TryWriteInt32(output, ref dcurr, dend))
SendAndReset();
break;
Expand Down
10 changes: 10 additions & 0 deletions libs/server/ServerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Text;
using Garnet.common;
using Garnet.server.KeyspaceNotifications;
using Microsoft.Extensions.Logging;

namespace Garnet.server
Expand Down Expand Up @@ -131,6 +132,7 @@ private bool NetworkCONFIG_SET()
string certPassword = null;
string clusterUsername = null;
string clusterPassword = null;
string notifiyKeyspaceEventsArguments = null;
var unknownOption = false;
var unknownKey = "";

Expand All @@ -147,6 +149,8 @@ private bool NetworkCONFIG_SET()
clusterUsername = Encoding.ASCII.GetString(value);
else if (key.SequenceEqual(CmdStrings.ClusterPassword))
clusterPassword = Encoding.ASCII.GetString(value);
else if (key.SequenceEqual(CmdStrings.NotifiyKeyspaceEvents))
notifiyKeyspaceEventsArguments = Encoding.ASCII.GetString(value);
else
{
if (!unknownOption)
Expand Down Expand Up @@ -191,6 +195,12 @@ private bool NetworkCONFIG_SET()
else errorMsg += " TLS is disabled.";
}
}

// TODO: Parse argument string into enum flags and remove second if statement
if (notifiyKeyspaceEventsArguments != null && notifiyKeyspaceEventsArguments.Equals("KEA"))
{
storeWrapper.serverOptions.AllowedKeyspaceNotifications = KeyspaceNotificationType.All | KeyspaceNotificationType.Keyspace | KeyspaceNotificationType.Keyevent;
}
}

if (errorMsg == null)
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using Garnet.server.Auth.Settings;
using Garnet.server.KeyspaceNotifications;
using Garnet.server.TLS;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
Expand Down Expand Up @@ -196,6 +197,11 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public string ClusterPassword;

/// <summary>
/// Keyspace notifications
/// </summary>
public KeyspaceNotificationType AllowedKeyspaceNotifications;

/// <summary>
/// Enable per command latency tracking for all commands
/// </summary>
Expand Down
Loading