Skip to content

LogRecord and SpanByte changes #1186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 123 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
123 commits
Select commit Hold shift + click to select a range
0ca8493
Refactors prior to storage-v2:
TedHartMS Nov 26, 2024
421d7a0
WIP; first cut at LogRecord etc. implementations
TedHartMS Dec 9, 2024
7542d8a
WIP: more LogRecord
TedHartMS Dec 10, 2024
0797412
More *LogRecord, and DiskRecord
TedHartMS Dec 11, 2024
f94a769
WIP: Convert allocators to new LogRecord and DiskRecord
TedHartMS Dec 20, 2024
53f4941
WIP: incorporate LogRecord into ISessionFunctions
TedHartMS Dec 24, 2024
ca4b399
More logRecord WIP
TedHartMS Dec 26, 2024
3b754e6
More WIP on MainSessionFunctions RMWMethods (and PrivateMethods) with…
TedHartMS Dec 28, 2024
49e9769
WIP LogRecord (MainStore RMWMethods)
TedHartMS Dec 30, 2024
a92c2bb
WIP: Consolidate to a single LogRecord; convert InternalRMW
TedHartMS Jan 5, 2025
1ce5ee6
TKey removal
TedHartMS Jan 7, 2025
95205ce
A few more TKey removals
TedHartMS Jan 7, 2025
493302d
WIP more logRecord
TedHartMS Jan 8, 2025
a470009
Remove pendingConterxt.recordInfo as it is no longer in RecordMetadata
TedHartMS Jan 9, 2025
60cba5a
WIP; Internal(RUMD) converted.
TedHartMS Jan 10, 2025
0c587fc
added PendingContext implementation of ISourceLogRecord
TedHartMS Jan 11, 2025
81bd194
Finish ContinuePending
TedHartMS Jan 12, 2025
1de5c11
Iterator changes - consolidate to RecordScanIterator
TedHartMS Jan 15, 2025
cf491bc
Mostly IO work
TedHartMS Jan 16, 2025
5d925ff
WIP: Reintroduce TValue through ObjectAllocator, LogRecord, etc.
TedHartMS Jan 16, 2025
06464cb
WIP Convert Garnet to LogRecord and ObjectAllocator (MainStoreFunctio…
TedHartMS Jan 17, 2025
340c1a7
TedHartMS Jan 18, 2025
2063e47
WIP converting Garnet to non-ref SpanByte-only keys
TedHartMS Jan 20, 2025
7e44fa7
More WIP on Garnet migration to LogRecord
TedHartMS Jan 20, 2025
d2d32c5
WIP Tsavorite LogRecord odds and ends
TedHartMS Jan 21, 2025
5f0639d
Rename TsavoriteLog to TsavoriteAof
TedHartMS Jan 22, 2025
575fd02
More processing-layer LogRecord etc. fixes
TedHartMS Jan 23, 2025
1f878ab
fix iterator as ISourceLogRecord
TedHartMS Jan 23, 2025
785cc31
More Expiration etc. cleanup in processing layer
TedHartMS Jan 23, 2025
42dd7ab
WIP BlockAllocate work
TedHartMS Jan 23, 2025
3b000cc
FInalize removal of SpanByte.Metadata
TedHartMS Jan 23, 2025
850de54
More param fixes in processing layer
TedHartMS Jan 23, 2025
4a85072
SpanField and OptionalFieldsShift structs
TedHartMS Jan 27, 2025
4eb8283
Propagate some SpanField etc. changes
TedHartMS Jan 27, 2025
7c8bc4d
First pass at LogRecord-based Revivification
TedHartMS Jan 28, 2025
f5a0e88
Finish OverflowAllocator
TedHartMS Jan 30, 2025
4eceed2
OK now I finished this iteration of OverflowAllocator
TedHartMS Jan 30, 2025
d6512a0
improve callers' use of OverflowAllocator
TedHartMS Jan 30, 2025
66e7ccd
More LogRecord-related cleanup
TedHartMS Jan 30, 2025
f4814a7
// TODOMigrate: temporarily clear errors related to Expiration in Key…
TedHartMS Jan 30, 2025
3e925ea
Fixes for first test runs
TedHartMS Jan 31, 2025
405a1ca
More fixes for CopyRecord and setting of Expiration etc
TedHartMS Jan 31, 2025
1ec5002
Cleanup OversizeAllocator.BlockHeader and usage
TedHartMS Feb 2, 2025
4a1ba56
SpanField and LogRecord documentation and cleanup; BlockHeader GetUse…
TedHartMS Feb 3, 2025
bc79840
More improvements to overflow allocator and usage
TedHartMS Feb 4, 2025
4747f95
More fixes for tests
TedHartMS Feb 4, 2025
fe870f4
Remove unneeded IAllocator(Callbacks) methods (superseded by LogRecord)
TedHartMS Feb 4, 2025
f002cf0
more overflow allocator fixes
TedHartMS Feb 5, 2025
ae51071
sizeInfo on ISessionFunctions and TrySetValueSpanLength improvements
TedHartMS Feb 7, 2025
e071de9
Remove SpanByte serialized usage (but leave an assert on .Serialized …
TedHartMS Feb 9, 2025
3799c67
Improved FreeList handling and ObjectIdMap growth/FreeList via Simple…
TedHartMS Feb 10, 2025
eb74f3c
Make sure DiskLogRecord Value and Record lengths are long
TedHartMS Feb 11, 2025
e734266
Remove ISerializer (not used and not planned to be)
TedHartMS Feb 11, 2025
417a2be
TIghten up SpanByte !IsSerialized assert and fix SETEX "expiration se…
TedHartMS Feb 11, 2025
6b6309f
Some TODO cleanup
TedHartMS Feb 11, 2025
bb3ca9d
Begin converting Tsavorite tests and benchmarks to LogRecord
TedHartMS Feb 14, 2025
e01c049
Tsavorite RevivificationTests and ObjectTests
TedHartMS Feb 16, 2025
ad3937a
More ObjectIdMap and OverflowAllocator work and tests
TedHartMS Feb 21, 2025
b6bfdcf
More overflowAllocator work: FixedSizePages and other fixes for YCSB,…
TedHartMS Feb 24, 2025
2f7533e
Clean up Locking.md (had some referenced to obsolete locking modes); …
TedHartMS Feb 26, 2025
792ad09
support for "inline struct" as SpanByte for Object value optimization
TedHartMS Feb 28, 2025
dae8537
finish "inline struct" testing; update processing layer to Tsavorite …
TedHartMS Mar 3, 2025
36b5f23
Convert RecordFieldInfo Key/Value TotalSize to DataSize - caller shou…
TedHartMS Mar 4, 2025
ecc0ec6
Merge 'main'
TedHartMS Mar 6, 2025
133000e
WIP main merge;
TedHartMS Mar 10, 2025
9f14510
Fixes from merging main
TedHartMS Mar 11, 2025
d84f749
More testing and fixes
TedHartMS Mar 11, 2025
1691e21
More merge fixes
TedHartMS Mar 12, 2025
98c6e6d
fix more tests
TedHartMS Mar 13, 2025
e2ef22f
merge 'main'
TedHartMS Mar 14, 2025
87f0471
Merge remote-tracking branch 'origin/main' into tedhar/storage-v2
TedHartMS Mar 14, 2025
a8c83b0
merge 'main'
TedHartMS Mar 14, 2025
7a29282
Merge remote-tracking branch 'origin/main' into tedhar/storage-v2
TedHartMS Mar 14, 2025
7994e1f
Merge remote-tracking branch 'origin/tedhar/storage-v2' into tedhar/o…
TedHartMS Mar 14, 2025
3dfef24
minor comment cleanup
TedHartMS Mar 14, 2025
d932171
some doc-related updates, mostly comments
TedHartMS Mar 15, 2025
35e12b6
Add DisposeReason.Deleted, .Elided
TedHartMS Mar 16, 2025
8f92e69
merge 'main'
TedHartMS Mar 19, 2025
cf3ab14
merge 'main'
TedHartMS Mar 19, 2025
012795d
WIP Span<byte>
TedHartMS Mar 23, 2025
a0aa148
WIP Span<byte>
TedHartMS Mar 24, 2025
494612b
WIP Span<byte>
TedHartMS Mar 24, 2025
5b7064f
WIP Span<byte>
TedHartMS Mar 24, 2025
8581f48
WIP Span<byte>
TedHartMS Mar 25, 2025
f870206
WIP Span<byte>
TedHartMS Mar 25, 2025
9d31c5e
WIP Span<byte>
TedHartMS Mar 26, 2025
31e4ff1
WIP Span<byte> (Tsavorite builds, with a couple hacks)
TedHartMS Mar 27, 2025
12813a4
WIP Span<byte>: YCSB fixes
TedHartMS Mar 28, 2025
54bdb41
WIP Span<byte>: more fixes found during YCSB
TedHartMS Mar 28, 2025
5168a2f
Switching CTT, CTRC, TempKv insertions to TSourceLogRecord direct cop…
TedHartMS Apr 5, 2025
7fdf35f
WIP Span<byte>: more cleanup in Compaction (no TSessionFunctions need…
TedHartMS Apr 8, 2025
c3008f8
WIP Span<byte>: LogRecord revisions in functions and prep Migrate/Rep…
TedHartMS Apr 9, 2025
9755afa
ISessionFunctions revisions
TedHartMS Apr 9, 2025
03c8fdc
Remove TInput, TOutput, TContext from Compact and PrepareIOForConditi…
TedHartMS Apr 9, 2025
0f1d14f
Remove WriteReason as it is no longer needed with Compaction, CopyToT…
TedHartMS Apr 9, 2025
b5b22ae
Missed a few WriteReason
TedHartMS Apr 10, 2025
1db7629
WIP Span<byte>: Cleanup DiskLogRecord deserialization and remove some…
TedHartMS Apr 11, 2025
2193395
Varebyte in DiskLogRecord
TedHartMS Apr 12, 2025
8fa2910
More LogRecord testing
TedHartMS Apr 15, 2025
631ed77
FInish DiskLogRecord serialization tests
TedHartMS Apr 15, 2025
e73e7d8
Rename SpanField to LogField
TedHartMS Apr 15, 2025
d49bcfc
Various cleanup, including PrepareForRevivification
TedHartMS Apr 22, 2025
fb98a20
WIP on converting Replication and Migration to LogRecord
TedHartMS Apr 22, 2025
34436fa
WIP on Repl (done-ish) and Migrate (mostly done except ISF) conversio…
TedHartMS Apr 24, 2025
744af73
Split IHO.Size into .MemorySize, DiskSize
TedHartMS Apr 25, 2025
160c332
Finish LogRecordization of Migrate
TedHartMS Apr 25, 2025
526e149
First fixes for Migration testing
TedHartMS Apr 25, 2025
f2d4c67
More Migrate testing; use SBAM.MemorySpan
TedHartMS Apr 27, 2025
12c797c
SImple Migrate tests now run
TedHartMS Apr 28, 2025
77b6105
Simple diskless replica runs
TedHartMS Apr 28, 2025
90793d2
Fixes for in-memory tests
TedHartMS Apr 29, 2025
0dd2039
Add UpsertValueSelector
TedHartMS Apr 29, 2025
c43401c
Remove PendingContext.IsAsync (a remnant of earlier, since-removed as…
TedHartMS Apr 29, 2025
3eb46a0
merge 'main'
TedHartMS Apr 30, 2025
eb20108
Merge 'tedhar/storage-v2'
TedHartMS May 1, 2025
b5205ec
Cleanup Read initialization of PendingContext
TedHartMS May 2, 2025
c415865
Fix record alignment in YCSB test
TedHartMS May 4, 2025
00b050d
tune HashBytes
TedHartMS May 4, 2025
4e043c1
Fix initial inserting in YCSB
TedHartMS May 5, 2025
cad3254
Update LogRecord.md and some cleanup
TedHartMS May 6, 2025
b4a942f
Restore TsavoriteAof naming back to TsavoriteLog
TedHartMS May 6, 2025
c7666db
Merge remote-tracking branch 'origin/main' into tedhar/storage-v2
TedHartMS May 6, 2025
48a31d4
merge 'tedhar/storage-v2'
TedHartMS May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
16 changes: 8 additions & 8 deletions benchmark/BDN.benchmark/Custom/CustomTxnSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ sealed class CustomTxnSet : CustomTransactionProcedure
/// </summary>
public const string CommandName = "CTXNSET";

ArgSlice setA;
ArgSlice setB;
ArgSlice setC;
ArgSlice setD;
PinnedSpanByte setA;
PinnedSpanByte setB;
PinnedSpanByte setC;
PinnedSpanByte setD;

ArgSlice valueA;
ArgSlice valueB;
ArgSlice valueC;
ArgSlice valueD;
PinnedSpanByte valueA;
PinnedSpanByte valueB;
PinnedSpanByte valueC;
PinnedSpanByte valueD;

/// <summary>
/// CTXNSET key1 key2 key3 key4 value1 value2 value3 value4
Expand Down
11 changes: 8 additions & 3 deletions libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,17 @@ private unsafe void Flush()
Dispose();
throw;
}
networkSender.GetResponseObject();
offset = networkSender.GetResponseObjectHead();
end = networkSender.GetResponseObjectTail();
ResetOffset();
}
}

private unsafe void ResetOffset()
{
networkSender.GetResponseObject();
offset = networkSender.GetResponseObjectHead();
end = networkSender.GetResponseObjectTail();
}

/// <inheritdoc />
public bool TryCreateMessageConsumer(Span<byte> bytesReceived, INetworkSender networkSender, out IMessageConsumer session)
=> throw new NotSupportedException();
Expand Down
131 changes: 50 additions & 81 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
IncrementalSendType ist;
bool isMainStore;
byte* curr, head;
int keyValuePairCount;
int recordCount;
TaskCompletionSource<string> currTcsIterationTask = null;

/// <summary>
Expand All @@ -42,15 +42,37 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
public bool NeedsInitialization => curr == null;

/// <summary>
/// Flush and initialize buffers/parameters used for migrate command
/// Return a <see cref="Span{_byte_}"/> of all remaining available space in the network buffer.
/// </summary>
public PinnedSpanByte GetAvailableNetworkBufferSpan() => PinnedSpanByte.FromPinnedPointer(curr, (int)(end - curr));

public void IncrementRecordDirect(int size)
{
++recordCount;
curr += size;
}

private void EnsureTcsIsEnqueued()
{
// See comments in SetClusterMigrateHeader() as to why this is decoupled from the header initialization.
if (recordCount > 0 && currTcsIterationTask == null)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
}
}

/// <summary>
/// Flush and initialize buffers/parameters used for Migrate and Replica commands
/// </summary>
/// <param name="iterationProgressFreq"></param>
public void InitializeIterationBuffer(TimeSpan iterationProgressFreq = default)
{
EnsureTcsIsEnqueued();
Flush();
currTcsIterationTask = null;
curr = head = null;
keyValuePairCount = 0;
recordCount = 0;
this.iterationProgressFreq = default ? TimeSpan.FromSeconds(5) : iterationProgressFreq;
}

Expand All @@ -59,117 +81,64 @@ public void InitializeIterationBuffer(TimeSpan iterationProgressFreq = default)
/// </summary>
public Task<string> SendAndResetIterationBuffer()
{
if (keyValuePairCount == 0) return null;
Task<string> task = null;
if (recordCount == 0)
{
// No records to Flush(), but we need to reset buffer offsets as we may have written a header due to the need to initialize the buffer
// before passing it to Tsavorite as the output SpanByteAndMemory.SpanByte for Read().
ResetOffset();
goto done;
}

Debug.Assert(end - curr >= 2);
*curr++ = (byte)'\r';
*curr++ = (byte)'\n';

// Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
var size = (int)(curr - 2 - head - (ExtraSpace - 4));
TrackIterationProgress(keyValuePairCount, size);
TrackIterationProgress(recordCount, size);
var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
Debug.Assert(success);

// Number of key value pairs in payload
*(int*)head = keyValuePairCount;
*(int*)head = recordCount;

// Reset offset and flush buffer
offset = curr;
EnsureTcsIsEnqueued();
Flush();
Interlocked.Increment(ref numCommands);

// Return outstanding task and reset current tcs
var task = currTcsIterationTask.Task;
task = currTcsIterationTask.Task;
currTcsIterationTask = null;
recordCount = 0;

done:
curr = head = null;
keyValuePairCount = 0;
return task;
}

/// <summary>
/// Try write key value pair for main store directly to the client buffer
/// Try to write the span for the entire record directly to the client buffer
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="task"></param>
/// <returns></returns>
public bool TryWriteKeyValueSpanByte(ref SpanByte key, ref SpanByte value, out Task<string> task)
public bool TryWriteRecordSpan(ReadOnlySpan<byte> recordSpan, out Task<string> task)
{
task = null;
// Try write key value pair directly to client buffer
if (!WriteSerializedSpanByte(ref key, ref value))
// We include space for newline at the end, to be added before sending
var totalLen = recordSpan.TotalSize() + 2;
if (totalLen > (int)(end - curr))
{
// If failed to write because no space left send outstanding data and retrieve task
// Caller is responsible for retrying
// If there is no space left, send outstanding data and return the send-completion task.
// Caller is responsible for waiting for task completion and retrying.
task = SendAndResetIterationBuffer();
return false;
}

keyValuePairCount++;
return true;

bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value)
{
var totalLen = key.TotalSize + value.TotalSize + 2 + 2;
if (totalLen > (int)(end - curr))
return false;

key.CopyTo(curr);
curr += key.TotalSize;
value.CopyTo(curr);
curr += value.TotalSize;
return true;
}
}

/// <summary>
/// Try write key value pair for object store directly to the client buffer
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="expiration"></param>
/// <param name="task"></param>
/// <returns></returns>
public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration, out Task<string> task)
{
recordSpan.SerializeTo(curr);
curr += recordSpan.TotalSize();
++recordCount;
task = null;
// Try write key value pair directly to client buffer
if (!WriteSerializedKeyValueByteArray(key, value, expiration))
{
// If failed to write because no space left send outstanding data and retrieve task
// Caller is responsible for retrying
task = SendAndResetIterationBuffer();
return false;
}

keyValuePairCount++;
return true;

bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long expiration)
{
// We include space for newline at the end, to be added before sending
int totalLen = 4 + key.Length + 4 + value.Length + 8 + 2;
if (totalLen > (int)(end - curr))
return false;

*(int*)curr = key.Length;
curr += 4;
fixed (byte* keyPtr = key)
Buffer.MemoryCopy(keyPtr, curr, key.Length, key.Length);
curr += key.Length;

*(int*)curr = value.Length;
curr += 4;
fixed (byte* valPtr = value)
Buffer.MemoryCopy(valPtr, curr, value.Length, value.Length);
curr += value.Length;

*(long*)curr = expiration;
curr += 8;

return true;
}
}

long lastLog = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ public Task<string> SetSlotRange(Memory<byte> state, string nodeid, List<(int, i
/// <param name="isMainStore"></param>
public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMainStore)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
// For Migration we send the (curr - end) buffer as the SpanByteAndMemory.SpanByte output to Tsavorite. Thus we must
// initialize the header first, so we have curr properly positioned, but we cannot yet enqueue currTcsIterationTask.
// Therefore we defer this until the actual Flush(), when we know we have records to send. This is not a concern for
// Replication, because it uses an iterator and thus knows it has a record before it initializes the header.
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.MIGRATE;
Expand Down Expand Up @@ -257,23 +259,24 @@ public Task<string> CompleteMigrate(string sourceNodeId, bool replace, bool isMa

// Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
var size = (int)(curr - 2 - head - (ExtraSpace - 4));
TrackIterationProgress(keyValuePairCount, size, completed: true);
TrackIterationProgress(recordCount, size, completed: true);
var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
Debug.Assert(success);

// Number of key value pairs in payload
*(int*)head = keyValuePairCount;
*(int*)head = recordCount;

// Reset offset and flush buffer
offset = curr;
EnsureTcsIsEnqueued();
Flush();
Interlocked.Increment(ref numCommands);

// Return outstanding task and reset current tcs
var task = currTcsIterationTask.Task;
currTcsIterationTask = null;
curr = head = null;
keyValuePairCount = 0;
recordCount = 0;
return task;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ public Task<string> ExecuteAttachSync(byte[] syncMetadata)
/// <param name="isMainStore"></param>
public void SetClusterSyncHeader(string sourceNodeId, bool isMainStore)
{
// Unlike Migration, where we don't know at the time of header initialization if we have a record or not, in Replication
// we know we have a record at the time this is called, so we can initialize it directly.
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
Expand Down
27 changes: 13 additions & 14 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
using BasicGarnetApi = GarnetApi<BasicContext<RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>>;

/// <summary>
/// Cluster manager
Expand Down Expand Up @@ -477,12 +477,12 @@ public void TryResetSlotState(HashSet<int> slots)
public static unsafe void DeleteKeysInSlotsFromMainStore(BasicGarnetApi BasicGarnetApi, HashSet<int> slots)
{
using var iter = BasicGarnetApi.IterateMainStore();
while (iter.GetNext(out _))
while (iter.GetNext())
{
ref var key = ref iter.GetKey();
var s = HashSlotUtils.HashSlot(ref key);
var key = iter.Key;
var s = HashSlotUtils.HashSlot(key);
if (slots.Contains(s))
_ = BasicGarnetApi.DELETE(ref key, StoreType.Main);
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key), StoreType.Main);
}
}

Expand All @@ -494,13 +494,12 @@ public static unsafe void DeleteKeysInSlotsFromMainStore(BasicGarnetApi BasicGar
public static unsafe void DeleteKeysInSlotsFromObjectStore(BasicGarnetApi BasicGarnetApi, HashSet<int> slots)
{
using var iterObject = BasicGarnetApi.IterateObjectStore();
while (iterObject.GetNext(out _))
while (iterObject.GetNext())
{
ref var key = ref iterObject.GetKey();
ref var value = ref iterObject.GetValue();
var key = iterObject.Key;
var s = HashSlotUtils.HashSlot(key);
if (slots.Contains(s))
_ = BasicGarnetApi.DELETE(key, StoreType.Object);
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key), StoreType.Object);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>,
GenericAllocator<byte[], IGarnetObject, StoreFunctions<byte[], IGarnetObject, ByteArrayKeyComparer, DefaultRecordDisposer<byte[], IGarnetObject>>>>>;
using BasicGarnetApi = GarnetApi<BasicContext<RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>>;

/// <summary>
/// Cluster provider
Expand Down
10 changes: 5 additions & 5 deletions libs/cluster/Server/Migration/MigrateSessionKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System;
using System.Runtime.CompilerServices;
using Garnet.common;
using Garnet.server;
using Tsavorite.core;

namespace Garnet.cluster
{
Expand Down Expand Up @@ -59,14 +59,14 @@ private void TryTransitionState(KeyMigrationStatus status)
/// <param name="readOnly"></param>
/// <returns></returns>
/// <exception cref="GarnetException"></exception>
public bool CanAccessKey(ref ArgSlice key, int slot, bool readOnly)
public bool CanAccessKey(ref PinnedSpanByte key, int slot, bool readOnly)
{
// Skip operation check since this session is not responsible for migrating the associated slot
if (!_sslots.Contains(slot))
return true;

// If key is not queued for migration then
if (!_keys.TryGetValue(ref key, out var state))
if (!_keys.TryGetValue(key, out var state))
return true;

// NOTE:
Expand All @@ -86,8 +86,8 @@ public bool CanAccessKey(ref ArgSlice key, int slot, bool readOnly)
/// </summary>
/// <param name="key"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool AddKey(ref ArgSlice key)
=> _keys.TryAdd(ref key, KeyMigrationStatus.QUEUED);
public bool AddKey(PinnedSpanByte key)
=> _keys.TryAdd(key, KeyMigrationStatus.QUEUED);

/// <summary>
/// Clear keys from dictionary
Expand Down
Loading
Loading