Skip to content

experiment: Make Tsavorite allocators unmanaged #1112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a4459d4
Make Tsavorite BufferPool unmanaged memory based
PaulusParssinen Mar 19, 2025
2b546da
Rename BufferPool.cs to SectorAlignedMemoryPool.cs
PaulusParssinen Mar 19, 2025
56be445
Make SpanByte & BlittableAllocators NativeMemory based
PaulusParssinen Mar 19, 2025
fa89dbd
Add nullptr check to in-memory deletion
PaulusParssinen Mar 19, 2025
75ef2d0
Move SectorAlignedMemoryExtensions slicing to the implementation
PaulusParssinen Mar 19, 2025
24f04cd
Fix HyperLogLogLength typo
PaulusParssinen Mar 19, 2025
0a02097
Inform GC of the unmanaged allocations
PaulusParssinen Mar 19, 2025
ee21819
Dispose the allocator memory
PaulusParssinen Mar 19, 2025
878f6e6
dotnet format
PaulusParssinen Mar 19, 2025
6a64c70
Restore zero-initialization behavior of the aligned memory
PaulusParssinen Mar 20, 2025
92490b5
Rename BufferPtr to Pointer
PaulusParssinen Mar 20, 2025
ba9dd5d
Refine bitmap buffer comment
PaulusParssinen Mar 20, 2025
f93b21f
Test reliance on previous over-allocating behavior, possible OOB. Var…
PaulusParssinen Mar 20, 2025
3cc1c0a
Fix RCU test by not reusing BC from disposed store
PaulusParssinen Mar 20, 2025
3ef93aa
Dispose log device first to prevent its IO thread from running and ac…
PaulusParssinen Mar 20, 2025
435bf4e
Avoid nullref in GenericaAllocator for multiple dispose calls
PaulusParssinen Mar 20, 2025
44b5e4e
Handle zero record request as one record request in pool
PaulusParssinen Mar 20, 2025
9d3cfc5
Do oversized allocations for standalone aligned memory allocs
PaulusParssinen Mar 20, 2025
98ecc12
oops. don't do that
PaulusParssinen Mar 20, 2025
c96568d
Fix BITOP destination buffer handling for no keys scenario
PaulusParssinen Mar 20, 2025
196f4f3
Refactor consumption of the pool by ClusterUtils
PaulusParssinen Mar 21, 2025
a7f1ff6
Use SafeHandle wrapper for the NativeMemory management
PaulusParssinen Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal sealed partial class ClusterManager : IDisposable
{
ClusterConfig currentConfig;
readonly IDevice clusterConfigDevice;
readonly SectorAlignedBufferPool pool;
readonly SectorAlignedMemoryPool pool;
readonly ILogger logger;

/// <summary>
Expand Down Expand Up @@ -103,7 +103,7 @@ public void Dispose()
DisposeBackgroundTasks();

clusterConfigDevice.Dispose();
pool.Free();
pool.Dispose();
}

/// <summary>
Expand Down
47 changes: 19 additions & 28 deletions libs/cluster/Server/ClusterUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Buffers.Binary;
using System.ComponentModel;
using System.Threading;
using Microsoft.Extensions.Logging;
Expand All @@ -11,7 +12,7 @@ namespace Garnet.cluster
{
internal static class ClusterUtils
{
public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, ILogger logger = null)
public static byte[] ReadDevice(IDevice device, SectorAlignedMemoryPool pool, ILogger logger = null)
{
ReadInto(device, pool, 0, out byte[] writePad, sizeof(int), logger);
int size = BitConverter.ToInt32(writePad, 0);
Expand All @@ -20,7 +21,7 @@ public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, IL
body = writePad;
else
ReadInto(device, pool, 0, out body, size + sizeof(int), logger);
return new Span<byte>(body)[sizeof(int)..].ToArray();
return body.AsSpan(sizeof(int)).ToArray();
}

/// <summary>
Expand All @@ -32,28 +33,22 @@ public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, IL
/// <param name="buffer"></param>
/// <param name="size"></param>
/// <param name="logger"></param>
public static unsafe void WriteInto(IDevice device, SectorAlignedBufferPool pool, ulong address, byte[] buffer, int size = 0, ILogger logger = null)
public static unsafe void WriteInto(IDevice device, SectorAlignedMemoryPool pool, ulong address, byte[] buffer, int size = 0, ILogger logger = null)
{
if (size == 0) size = buffer.Length;
var _buffer = new byte[size + sizeof(int)];
var len = BitConverter.GetBytes(size);
Array.Copy(len, _buffer, len.Length);
Array.Copy(buffer, 0, _buffer, len.Length, size);
size += len.Length;

long numBytesToWrite = size;
numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1));
var lengthPrefixedSize = size + sizeof(int);
var sectorAlignedBuffer = pool.Get(lengthPrefixedSize);
var sectorAlignedBufferSpan = sectorAlignedBuffer.AsSpan();

BinaryPrimitives.WriteInt32LittleEndian(sectorAlignedBufferSpan, size);
buffer.AsSpan().CopyTo(sectorAlignedBufferSpan.Slice(sizeof(int)));

var pbuffer = pool.Get((int)numBytesToWrite);
fixed (byte* bufferRaw = _buffer)
{
Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size);
}
using var semaphore = new SemaphoreSlim(0);
device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, logger == null ? IOCallback : logger.IOCallback, semaphore);
device.WriteAsync((IntPtr)sectorAlignedBuffer.Pointer, address, (uint)sectorAlignedBuffer.Length, logger == null ? IOCallback : logger.IOCallback, semaphore);
semaphore.Wait();

pbuffer.Return();
sectorAlignedBuffer.Return();
}


Expand All @@ -66,21 +61,17 @@ public static unsafe void WriteInto(IDevice device, SectorAlignedBufferPool pool
/// <param name="buffer"></param>
/// <param name="size"></param>
/// <param name="logger"></param>
private static unsafe void ReadInto(IDevice device, SectorAlignedBufferPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null)
private static unsafe void ReadInto(IDevice device, SectorAlignedMemoryPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null)
{
using var semaphore = new SemaphoreSlim(0);
long numBytesToRead = size;
numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1));
var sectorAlignedBuffer = pool.Get((int)size);

var pbuffer = pool.Get((int)numBytesToRead);
device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer,
(uint)numBytesToRead, logger == null ? IOCallback : logger.IOCallback, semaphore);
using var semaphore = new SemaphoreSlim(0);
device.ReadAsync(address, (IntPtr)sectorAlignedBuffer.Pointer, (uint)sectorAlignedBuffer.Length, logger == null ? IOCallback : logger.IOCallback, semaphore);
semaphore.Wait();

buffer = new byte[numBytesToRead];
fixed (byte* bufferRaw = buffer)
Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead);
pbuffer.Return();
buffer = new byte[sectorAlignedBuffer.Length];
sectorAlignedBuffer.AsSpan().CopyTo(buffer);
sectorAlignedBuffer.Return();
}

private static void IOCallback(uint errorCode, uint numBytes, object context)
Expand Down
22 changes: 10 additions & 12 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ internal sealed unsafe partial class MigrateSession : IDisposable
/// <returns>True on success, false otherwise</returns>
private bool MigrateKeysFromMainStore()
{
var bufferSize = 1 << 10;
SectorAlignedMemory buffer = new(bufferSize, 1);
var bufPtr = buffer.GetValidPointer();
var bufPtrEnd = bufPtr + bufferSize;
var o = new SpanByteAndMemory(bufPtr, (int)(bufPtrEnd - bufPtr));
const int bufferSize = 1 << 10;
var buffer = SectorAlignedMemory.Allocate(bufferSize, alignment: 1);
var output = SpanByteAndMemory.FromPinnedSpan(buffer.AsValidSpan());

try
{
Expand All @@ -46,7 +44,7 @@ private bool MigrateKeysFromMainStore()
var key = pair.Key.SpanByte;

// Read value for key
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref o);
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref output);

// Check if found in main store
if (status == GarnetStatus.NOTFOUND)
Expand All @@ -57,19 +55,19 @@ private bool MigrateKeysFromMainStore()
}

// Get SpanByte from stack if any
ref var value = ref o.SpanByte;
if (!o.IsSpanByte)
ref var value = ref output.SpanByte;
if (!output.IsSpanByte)
{
// Reinterpret heap memory to SpanByte
value = ref SpanByte.ReinterpretWithoutLength(o.Memory.Memory.Span);
value = ref SpanByte.ReinterpretWithoutLength(output.Memory.Memory.Span);
}

// Write key to network buffer if it has not expired
if (!ClusterSession.Expired(ref value) && !WriteOrSendMainStoreKeyValuePair(ref key, ref value))
return false;

// Reset SpanByte for next read if any but don't dispose heap buffer as we might re-use it
o.SpanByte = new SpanByte((int)(bufPtrEnd - bufPtr), (IntPtr)bufPtr);
output.SpanByte = SpanByte.FromPinnedSpan(buffer.AsValidSpan());
}

// Flush data in client buffer
Expand All @@ -81,8 +79,8 @@ private bool MigrateKeysFromMainStore()
finally
{
// If allocated memory in heap dispose it here.
if (o.Memory != default)
o.Memory.Dispose();
if (output.Memory != default)
output.Memory.Dispose();
buffer.Dispose();
}
return true;
Expand Down
22 changes: 7 additions & 15 deletions libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal sealed partial class ReplicaSyncSession(
readonly TimeSpan timeout = timeout;
readonly CancellationToken token = token;
readonly CancellationTokenSource cts = new();
SectorAlignedBufferPool bufferPool = null;
SectorAlignedMemoryPool bufferPool = null;
readonly SemaphoreSlim semaphore = new(0);

public readonly string replicaNodeId = replicaNodeId;
Expand All @@ -55,7 +55,7 @@ public void Dispose()
cts.Cancel();
cts.Dispose();
semaphore?.Dispose();
bufferPool?.Free();
bufferPool?.Dispose();
}

/// <summary>
Expand Down Expand Up @@ -440,7 +440,7 @@ private async Task SendFileSegments(GarnetClientSession gcs, Guid token, Checkpo
(int)(endAddress - startAddress);
var (pbuffer, readBytes) = ReadInto(device, (ulong)startAddress, num_bytes);

resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes)).ConfigureAwait(false);
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.AsSpan().Slice(0, readBytes)).ConfigureAwait(false);
if (!resp.Equals("OK"))
{
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
Expand Down Expand Up @@ -489,7 +489,7 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi
var num_bytes = startAddress + batchSize < size ? batchSize : (int)(size - startAddress);
var (pbuffer, readBytes) = ReadInto(device, (ulong)startAddress, num_bytes, segment);

resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes), segment).ConfigureAwait(false);
resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.AsSpan().Slice(0, readBytes), segment).ConfigureAwait(false);
if (!resp.Equals("OK"))
{
logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp);
Expand Down Expand Up @@ -529,16 +529,16 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi
/// <param name="segmentId"></param>
private unsafe (SectorAlignedMemory, int) ReadInto(IDevice device, ulong address, int size, int segmentId = -1)
{
bufferPool ??= new SectorAlignedBufferPool(1, (int)device.SectorSize);
bufferPool ??= new SectorAlignedMemoryPool(1, (int)device.SectorSize);

long numBytesToRead = size;
numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1));

var pbuffer = bufferPool.Get((int)numBytesToRead);
if (segmentId == -1)
device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, (uint)numBytesToRead, IOCallback, null);
device.ReadAsync(address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null);
else
device.ReadAsync(segmentId, address, (IntPtr)pbuffer.aligned_pointer, (uint)numBytesToRead, IOCallback, null);
device.ReadAsync(segmentId, address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null);
semaphore.Wait();
return (pbuffer, (int)numBytesToRead);
}
Expand All @@ -553,12 +553,4 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
semaphore.Release();
}
}

internal static unsafe class SectorAlignedMemoryExtensions
{
public static Span<byte> GetSlice(this SectorAlignedMemory pbuffer, int length)
{
return new Span<byte>(pbuffer.aligned_pointer, length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed unsafe class ReceiveCheckpointHandler
readonly ClusterProvider clusterProvider;
IDevice writeIntoCkptDevice = null;
private SemaphoreSlim writeCheckpointSemaphore = null;
private SectorAlignedBufferPool writeCheckpointBufferPool = null;
private SectorAlignedMemoryPool writeCheckpointBufferPool = null;

readonly ILogger logger;

Expand All @@ -28,7 +28,7 @@ public ReceiveCheckpointHandler(ClusterProvider clusterProvider, ILogger logger
public void Dispose()
{
writeCheckpointSemaphore?.Dispose();
writeCheckpointBufferPool?.Free();
writeCheckpointBufferPool?.Dispose();
writeCheckpointBufferPool = null;
CloseDevice();
}
Expand Down Expand Up @@ -75,26 +75,23 @@ public void ProcessFileSegments(int segmentId, Guid token, CheckpointFileType ty
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void WriteInto(IDevice device, ulong address, ReadOnlySpan<byte> buffer, int size, int segmentId = -1)
private void WriteInto(IDevice device, ulong address, ReadOnlySpan<byte> buffer, int size, int segmentId = -1)
{
if (writeCheckpointBufferPool == null)
writeCheckpointBufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
writeCheckpointBufferPool = new SectorAlignedMemoryPool(1, (int)device.SectorSize);

long numBytesToWrite = size;
numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1));

var pbuffer = writeCheckpointBufferPool.Get((int)numBytesToWrite);
fixed (byte* bufferRaw = buffer)
{
Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size);
}
buffer.Slice(0, size).CopyTo(pbuffer.AsSpan());

if (writeCheckpointSemaphore == null) writeCheckpointSemaphore = new(0);

if (segmentId == -1)
device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, IOCallback, null);
device.WriteAsync((IntPtr)pbuffer.Pointer, address, (uint)numBytesToWrite, IOCallback, null);
else
device.WriteAsync((IntPtr)pbuffer.aligned_pointer, segmentId, address, (uint)numBytesToWrite, IOCallback, null);
device.WriteAsync((IntPtr)pbuffer.Pointer, segmentId, address, (uint)numBytesToWrite, IOCallback, null);
writeCheckpointSemaphore.Wait();

pbuffer.Return();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ internal sealed partial class ReplicationManager : IDisposable
{
ReplicationHistory currentReplicationConfig;
readonly IDevice replicationConfigDevice;
readonly SectorAlignedBufferPool replicationConfigDevicePool;
readonly SectorAlignedMemoryPool replicationConfigDevicePool;

private void InitializeReplicationHistory()
{
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void Dispose()
_disposed = true;

replicationConfigDevice?.Dispose();
replicationConfigDevicePool?.Free();
replicationConfigDevicePool?.Dispose();

replicationSyncManager?.Dispose();

Expand Down
Loading
Loading