Skip to content

[Feature #1114] Implement active expiration for mutable region #1180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,14 @@ public IEnumerable<string> LuaAllowedFunctions
[Option("max-databases", Required = false, HelpText = "Max number of logical databases allowed in a single Garnet server instance")]
public int MaxDatabases { get; set; }

[IntRangeValidation(-1, int.MaxValue, isRequired: false)]
[Option("main-store-expired-collection-freq", Required = false, HelpText = "Main store's expired key collection frequency in seconds")]
public int MainStoreExpiredKeyCollectionFrequencySecs { get; set; }

[IntRangeValidation(-1, int.MaxValue, isRequired: false)]
[Option("main-store-expired-key-max-records-per-round", Required = false, HelpText = "Max keys to expire in-memory per round of expired key collection for main store.")]
public int MainStoreExpiredKeyMaxRecordsPerRound { get; set; }

/// <summary>
/// This property contains all arguments that were not parsed by the command line argument parser
/// </summary>
Expand Down Expand Up @@ -878,6 +886,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
UnixSocketPath = UnixSocketPath,
UnixSocketPermission = unixSocketPermissions,
MaxDatabases = MaxDatabases,
MainStoreExpiredKeyCollectionFrequencySecs = MainStoreExpiredKeyCollectionFrequencySecs,
MainStoreExpiredKeyMaxRecordsPerRound = MainStoreExpiredKeyMaxRecordsPerRound,
};
}

Expand Down
8 changes: 7 additions & 1 deletion libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -402,5 +402,11 @@
"UnixSocketPermission": 0,

/* Max number of logical databases allowed in a single Garnet server instance */
"MaxDatabases": 16
"MaxDatabases": 16,

/* Frequency of background expired key collection in seconds */
"MainStoreExpiredKeyCollectionFrequencySecs": -1,

/* Max number of records to be expired per main store mutable region expired key collection round */
"MainStoreExpiredKeyMaxRecordsPerRound": 10_000
}
53 changes: 53 additions & 0 deletions libs/server/ExpiredKeyCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.server
{
public sealed partial class StoreWrapper
{
public async Task CollectExpiredMainStoreKeys(int collectionFrequency, long perRoundMaxRecordsToCollect, CancellationToken token = default)
{
try
{
var scratchBufferManager = new ScratchBufferManager();
using var storageSession = new StorageSession(this, scratchBufferManager, null, null, logger);
while (true)
{
if (token.IsCancellationRequested) return;
// Take an unprotected look from the SafeReadOnlyRegion as the starting point for our scan.
// If there is any sort of shift of the marker then a few of my scanned records will be from a redundant region.
// DELIFEXPIREDINMEMORY will be noop for those records since they will early exit at NCU.
long safeInMemoryRegionAddrOfMainStore = this.store.Log.SafeReadOnlyAddress;
storageSession.ScanExpiredKeys(cursor: safeInMemoryRegionAddrOfMainStore, storeCursor: out long scannedTill, keys: out List<byte[]> keys, count: perRoundMaxRecordsToCollect);
RawStringInput input = new RawStringInput(RespCommand.DELIFEXPIM);
foreach (byte[] key in keys)
{
unsafe
{
fixed (byte* keyPtr = key)
{
SpanByte keySb = SpanByte.FromPinnedPointer(keyPtr, key.Length);
// Use basic session for transient locking
storageSession.DEL_Conditional(ref keySb, ref input, ref storageSession.basicContext);
}
}
logger?.LogDebug("Deleted Expired Key {key}", System.Text.Encoding.UTF8.GetString(key));
}
await Task.Delay(TimeSpan.FromSeconds(collectionFrequency), token);
}
}
catch (TaskCanceledException) when (token.IsCancellationRequested) { } // Suppress the exception if the task was cancelled because of store wrapper disposal
catch (Exception ex)
{
logger?.LogCritical(ex, "Unknown exception received for Expired key collection task. Collection task won't be resumed.");
}
}
}
}
10 changes: 10 additions & 0 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
RespCommand.REGISTERCS => NetworkRegisterCs(storeWrapper.customCommandManager),
RespCommand.MODULE_LOADCS => NetworkModuleLoad(storeWrapper.customCommandManager),
RespCommand.PURGEBP => NetworkPurgeBP(),
RespCommand.DELIFEXPIM => NetworkDELIFEXPIM(),
_ => cmdFound = false
};

Expand Down Expand Up @@ -856,6 +857,15 @@
return true;
}

/// <summary>
/// DELIFEXPIM [
/// </summary>
/// <returns></returns>
private bool NetworkDELIFEXPIM()

Check failure on line 864 in libs/server/Resp/AdminCommands.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'RespServerSession.NetworkDELIFEXPIM()': not all code paths return a value

Check failure on line 864 in libs/server/Resp/AdminCommands.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'RespServerSession.NetworkDELIFEXPIM()': not all code paths return a value
{

}

/// <summary>
/// SAVE [DBID]
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public enum RespCommand : ushort
DECR,
DECRBY,
DEL,
DELIFEXPIM,
DELIFGREATER,
EXPIRE,
EXPIREAT,
Expand Down
10 changes: 10 additions & 0 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public int CommitFrequencyMs = 0;

/// <summary>
/// Frequency of background expired key collection in seconds.
/// </summary>
public int MainStoreExpiredKeyCollectionFrequencySecs = -1;

/// <summary>
/// Max number of records to expire in-memory per round of expired key collection.
/// </summary>
public long MainStoreExpiredKeyMaxRecordsPerRound = 10_000;

/// <summary>
/// Index resize check frequency in seconds.
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public bool NeedInitialUpdate(ref SpanByte key, ref RawStringInput input, ref Sp
case RespCommand.EXPIREAT:
case RespCommand.PEXPIREAT:
case RespCommand.GETDEL:
case RespCommand.DELIFEXPIM:
case RespCommand.GETEX:
case RespCommand.DELIFGREATER:
return false;
Expand Down Expand Up @@ -832,6 +833,18 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
}

return false;
case RespCommand.DELIFEXPIM:
// Only if the key has expired, will we delete it.
if (value.MetadataSize > 0 && input.header.CheckExpiry(value.ExtraMetadata))
{
// setting the action and returning false will tombstone this record
rmwInfo.Action = RMWAction.ExpireAndStop;
// reset etag state that may have been initialized earlier,
EtagState.ResetState(ref functionsState.etagState);
return false;
}
shouldUpdateEtag = false;
break;
default:
if (cmd > RespCommandExtensions.LastValidCommand)
{
Expand Down Expand Up @@ -908,6 +921,8 @@ public bool NeedCopyUpdate(ref SpanByte key, ref RawStringInput input, ref SpanB
{
switch (input.header.cmd)
{
case RespCommand.DELIFEXPIM:
return false;
case RespCommand.DELIFGREATER:
if (rmwInfo.RecordInfo.ETag)
EtagState.SetValsForRecordWithEtag(ref functionsState.etagState, ref oldValue);
Expand Down
45 changes: 45 additions & 0 deletions libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sealed partial class StorageSession : IDisposable
// Iterators for SCAN command
private ArrayKeyIterationFunctions.MainStoreGetDBKeys mainStoreDbScanFuncs;
private ArrayKeyIterationFunctions.ObjectStoreGetDBKeys objStoreDbScanFuncs;
private ArrayKeyIterationFunctions.MainStoreGetExpiredKeys mainStoreDbExpiredScanFuncs;

// Iterators for KEYS command
private ArrayKeyIterationFunctions.MainStoreGetDBKeys mainStoreDbKeysFuncs;
Expand Down Expand Up @@ -109,6 +110,20 @@ internal unsafe bool DbScan(ArgSlice patternB, bool allKeys, long cursor, out lo
return true;
}

internal void ScanExpiredKeys(long cursor, out long storeCursor, out List<byte[]> keys, long count, ReadOnlySpan<byte> typeObject = default)
{
Keys ??= new();
Keys.Clear();

keys = Keys;

mainStoreDbExpiredScanFuncs ??= new();
mainStoreDbExpiredScanFuncs.Initialize(keys);

storeCursor = cursor;
basicContext.Session.ScanCursor(ref storeCursor, count, mainStoreDbExpiredScanFuncs, validateCursor: cursor != 0 && cursor != lastScanCursor);
}

/// <summary>
/// Iterate the contents of the main store (push-based)
/// </summary>
Expand Down Expand Up @@ -211,6 +226,36 @@ internal void Initialize(List<byte[]> keys, byte* patternB, int length, Type mat
}
}

internal sealed class MainStoreGetExpiredKeys : IScanIteratorFunctions<SpanByte, SpanByte>
{
private readonly GetDBKeysInfo info;

internal MainStoreGetExpiredKeys() => info = new();

internal void Initialize(List<byte[]> keys)
=> info.Initialize(keys, default, 0);

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> ConcurrentReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);

public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
if (value.MetadataSize == 0 || !MainSessionFunctions.CheckExpiry(ref value))
cursorRecordResult = CursorRecordResult.Skip;
else
{
cursorRecordResult = CursorRecordResult.Accept;
info.keys.Add(key.ToByteArray());
}

return true;
}

public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal sealed class MainStoreGetDBKeys : IScanIteratorFunctions<SpanByte, SpanByte>
{
private readonly GetDBKeysInfo info;
Expand Down
7 changes: 6 additions & 1 deletion libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace Garnet.server
/// <summary>
/// Wrapper for store and store-specific information
/// </summary>
public sealed class StoreWrapper
public sealed partial class StoreWrapper
{
internal readonly string version;
internal readonly string redisProtocolVersion;
Expand Down Expand Up @@ -766,6 +766,11 @@ internal void Start()
Task.Run(async () => await ObjectCollectTask(serverOptions.ExpiredObjectCollectionFrequencySecs, ctsCommit.Token));
}

if (serverOptions.MainStoreExpiredKeyCollectionFrequencySecs > 0)
{
Task.Run(async () => await CollectExpiredMainStoreKeys(serverOptions.MainStoreExpiredKeyCollectionFrequencySecs, serverOptions.MainStoreExpiredKeyMaxRecordsPerRound), ctsCommit.Token);
}

if (serverOptions.AdjustedIndexMaxCacheLines > 0 || serverOptions.AdjustedObjectStoreIndexMaxCacheLines > 0)
{
Task.Run(() => IndexAutoGrowTask(ctsCommit.Token));
Expand Down
Loading