diff --git a/libs/cluster/Server/ClusterManager.cs b/libs/cluster/Server/ClusterManager.cs index 9d95c4d6657..206bc12bcef 100644 --- a/libs/cluster/Server/ClusterManager.cs +++ b/libs/cluster/Server/ClusterManager.cs @@ -18,7 +18,7 @@ internal sealed partial class ClusterManager : IDisposable { ClusterConfig currentConfig; readonly IDevice clusterConfigDevice; - readonly SectorAlignedBufferPool pool; + readonly SectorAlignedMemoryPool pool; readonly ILogger logger; /// @@ -103,7 +103,7 @@ public void Dispose() DisposeBackgroundTasks(); clusterConfigDevice.Dispose(); - pool.Free(); + pool.Dispose(); } /// diff --git a/libs/cluster/Server/ClusterUtils.cs b/libs/cluster/Server/ClusterUtils.cs index 1511bec80b2..4d0728b97b7 100644 --- a/libs/cluster/Server/ClusterUtils.cs +++ b/libs/cluster/Server/ClusterUtils.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Buffers.Binary; using System.ComponentModel; using System.Threading; using Microsoft.Extensions.Logging; @@ -11,7 +12,7 @@ namespace Garnet.cluster { internal static class ClusterUtils { - public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, ILogger logger = null) + public static byte[] ReadDevice(IDevice device, SectorAlignedMemoryPool pool, ILogger logger = null) { ReadInto(device, pool, 0, out byte[] writePad, sizeof(int), logger); int size = BitConverter.ToInt32(writePad, 0); @@ -20,7 +21,7 @@ public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, IL body = writePad; else ReadInto(device, pool, 0, out body, size + sizeof(int), logger); - return new Span(body)[sizeof(int)..].ToArray(); + return body.AsSpan(sizeof(int)).ToArray(); } /// @@ -32,28 +33,22 @@ public static byte[] ReadDevice(IDevice device, SectorAlignedBufferPool pool, IL /// /// /// - public static unsafe void WriteInto(IDevice device, SectorAlignedBufferPool pool, ulong address, byte[] buffer, int size = 0, ILogger logger = null) + public static unsafe void WriteInto(IDevice device, SectorAlignedMemoryPool pool, ulong address, byte[] buffer, int size = 0, ILogger logger = null) { if (size == 0) size = buffer.Length; - var _buffer = new byte[size + sizeof(int)]; - var len = BitConverter.GetBytes(size); - Array.Copy(len, _buffer, len.Length); - Array.Copy(buffer, 0, _buffer, len.Length, size); - size += len.Length; - long numBytesToWrite = size; - numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); + var lengthPrefixedSize = size + sizeof(int); + var sectorAlignedBuffer = pool.Get(lengthPrefixedSize); + var sectorAlignedBufferSpan = sectorAlignedBuffer.AsSpan(); + + BinaryPrimitives.WriteInt32LittleEndian(sectorAlignedBufferSpan, size); + buffer.AsSpan().CopyTo(sectorAlignedBufferSpan.Slice(sizeof(int))); - var pbuffer = pool.Get((int)numBytesToWrite); - fixed (byte* bufferRaw = _buffer) - { - Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size); - } using var semaphore = new SemaphoreSlim(0); - device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, logger == null ? IOCallback : logger.IOCallback, semaphore); + device.WriteAsync((IntPtr)sectorAlignedBuffer.Pointer, address, (uint)sectorAlignedBuffer.Length, logger == null ? IOCallback : logger.IOCallback, semaphore); semaphore.Wait(); - pbuffer.Return(); + sectorAlignedBuffer.Return(); } @@ -66,21 +61,17 @@ public static unsafe void WriteInto(IDevice device, SectorAlignedBufferPool pool /// /// /// - private static unsafe void ReadInto(IDevice device, SectorAlignedBufferPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null) + private static unsafe void ReadInto(IDevice device, SectorAlignedMemoryPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null) { - using var semaphore = new SemaphoreSlim(0); - long numBytesToRead = size; - numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); + var sectorAlignedBuffer = pool.Get((int)size); - var pbuffer = pool.Get((int)numBytesToRead); - device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, - (uint)numBytesToRead, logger == null ? IOCallback : logger.IOCallback, semaphore); + using var semaphore = new SemaphoreSlim(0); + device.ReadAsync(address, (IntPtr)sectorAlignedBuffer.Pointer, (uint)sectorAlignedBuffer.Length, logger == null ? IOCallback : logger.IOCallback, semaphore); semaphore.Wait(); - buffer = new byte[numBytesToRead]; - fixed (byte* bufferRaw = buffer) - Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead); - pbuffer.Return(); + buffer = new byte[sectorAlignedBuffer.Length]; + sectorAlignedBuffer.AsSpan().CopyTo(buffer); + sectorAlignedBuffer.Return(); } private static void IOCallback(uint errorCode, uint numBytes, object context) diff --git a/libs/cluster/Server/Migration/MigrateSessionKeys.cs b/libs/cluster/Server/Migration/MigrateSessionKeys.cs index abc5028eba3..ded6e361530 100644 --- a/libs/cluster/Server/Migration/MigrateSessionKeys.cs +++ b/libs/cluster/Server/Migration/MigrateSessionKeys.cs @@ -20,11 +20,9 @@ internal sealed unsafe partial class MigrateSession : IDisposable /// True on success, false otherwise private bool MigrateKeysFromMainStore() { - var bufferSize = 1 << 10; - SectorAlignedMemory buffer = new(bufferSize, 1); - var bufPtr = buffer.GetValidPointer(); - var bufPtrEnd = bufPtr + bufferSize; - var o = new SpanByteAndMemory(bufPtr, (int)(bufPtrEnd - bufPtr)); + const int bufferSize = 1 << 10; + var buffer = SectorAlignedMemory.Allocate(bufferSize, alignment: 1); + var output = SpanByteAndMemory.FromPinnedSpan(buffer.AsValidSpan()); try { @@ -46,7 +44,7 @@ private bool MigrateKeysFromMainStore() var key = pair.Key.SpanByte; // Read value for key - var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref o); + var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref output); // Check if found in main store if (status == GarnetStatus.NOTFOUND) @@ -57,11 +55,11 @@ private bool MigrateKeysFromMainStore() } // Get SpanByte from stack if any - ref var value = ref o.SpanByte; - if (!o.IsSpanByte) + ref var value = ref output.SpanByte; + if (!output.IsSpanByte) { // Reinterpret heap memory to SpanByte - value = ref SpanByte.ReinterpretWithoutLength(o.Memory.Memory.Span); + value = ref SpanByte.ReinterpretWithoutLength(output.Memory.Memory.Span); } // Write key to network buffer if it has not expired @@ -69,7 +67,7 @@ private bool MigrateKeysFromMainStore() return false; // Reset SpanByte for next read if any but don't dispose heap buffer as we might re-use it - o.SpanByte = new SpanByte((int)(bufPtrEnd - bufPtr), (IntPtr)bufPtr); + output.SpanByte = SpanByte.FromPinnedSpan(buffer.AsValidSpan()); } // Flush data in client buffer @@ -81,8 +79,8 @@ private bool MigrateKeysFromMainStore() finally { // If allocated memory in heap dispose it here. - if (o.Memory != default) - o.Memory.Dispose(); + if (output.Memory != default) + output.Memory.Dispose(); buffer.Dispose(); } return true; diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index fc9329d594b..31741c1e738 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -34,7 +34,7 @@ internal sealed partial class ReplicaSyncSession( readonly TimeSpan timeout = timeout; readonly CancellationToken token = token; readonly CancellationTokenSource cts = new(); - SectorAlignedBufferPool bufferPool = null; + SectorAlignedMemoryPool bufferPool = null; readonly SemaphoreSlim semaphore = new(0); public readonly string replicaNodeId = replicaNodeId; @@ -55,7 +55,7 @@ public void Dispose() cts.Cancel(); cts.Dispose(); semaphore?.Dispose(); - bufferPool?.Free(); + bufferPool?.Dispose(); } /// @@ -440,7 +440,7 @@ private async Task SendFileSegments(GarnetClientSession gcs, Guid token, Checkpo (int)(endAddress - startAddress); var (pbuffer, readBytes) = ReadInto(device, (ulong)startAddress, num_bytes); - resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes)).ConfigureAwait(false); + resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.AsSpan().Slice(0, readBytes)).ConfigureAwait(false); if (!resp.Equals("OK")) { logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp); @@ -489,7 +489,7 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi var num_bytes = startAddress + batchSize < size ? batchSize : (int)(size - startAddress); var (pbuffer, readBytes) = ReadInto(device, (ulong)startAddress, num_bytes, segment); - resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.GetSlice(readBytes), segment).ConfigureAwait(false); + resp = await gcs.ExecuteSendFileSegments(fileTokenBytes, (int)type, startAddress, pbuffer.AsSpan().Slice(0, readBytes), segment).ConfigureAwait(false); if (!resp.Equals("OK")) { logger?.LogError("Primary error at SendFileSegments {type} {resp}", type, resp); @@ -529,16 +529,16 @@ private async Task SendObjectFiles(GarnetClientSession gcs, Guid token, Checkpoi /// private unsafe (SectorAlignedMemory, int) ReadInto(IDevice device, ulong address, int size, int segmentId = -1) { - bufferPool ??= new SectorAlignedBufferPool(1, (int)device.SectorSize); + bufferPool ??= new SectorAlignedMemoryPool(1, (int)device.SectorSize); long numBytesToRead = size; numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); var pbuffer = bufferPool.Get((int)numBytesToRead); if (segmentId == -1) - device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, (uint)numBytesToRead, IOCallback, null); + device.ReadAsync(address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null); else - device.ReadAsync(segmentId, address, (IntPtr)pbuffer.aligned_pointer, (uint)numBytesToRead, IOCallback, null); + device.ReadAsync(segmentId, address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null); semaphore.Wait(); return (pbuffer, (int)numBytesToRead); } @@ -553,12 +553,4 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context) semaphore.Release(); } } - - internal static unsafe class SectorAlignedMemoryExtensions - { - public static Span GetSlice(this SectorAlignedMemory pbuffer, int length) - { - return new Span(pbuffer.aligned_pointer, length); - } - } } \ No newline at end of file diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReceiveCheckpointHandler.cs b/libs/cluster/Server/Replication/ReplicaOps/ReceiveCheckpointHandler.cs index b91bbfe635c..c2971a47b87 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReceiveCheckpointHandler.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReceiveCheckpointHandler.cs @@ -15,7 +15,7 @@ internal sealed unsafe class ReceiveCheckpointHandler readonly ClusterProvider clusterProvider; IDevice writeIntoCkptDevice = null; private SemaphoreSlim writeCheckpointSemaphore = null; - private SectorAlignedBufferPool writeCheckpointBufferPool = null; + private SectorAlignedMemoryPool writeCheckpointBufferPool = null; readonly ILogger logger; @@ -28,7 +28,7 @@ public ReceiveCheckpointHandler(ClusterProvider clusterProvider, ILogger logger public void Dispose() { writeCheckpointSemaphore?.Dispose(); - writeCheckpointBufferPool?.Free(); + writeCheckpointBufferPool?.Dispose(); writeCheckpointBufferPool = null; CloseDevice(); } @@ -75,26 +75,23 @@ public void ProcessFileSegments(int segmentId, Guid token, CheckpointFileType ty /// /// /// - private unsafe void WriteInto(IDevice device, ulong address, ReadOnlySpan buffer, int size, int segmentId = -1) + private void WriteInto(IDevice device, ulong address, ReadOnlySpan buffer, int size, int segmentId = -1) { if (writeCheckpointBufferPool == null) - writeCheckpointBufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize); + writeCheckpointBufferPool = new SectorAlignedMemoryPool(1, (int)device.SectorSize); long numBytesToWrite = size; numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); var pbuffer = writeCheckpointBufferPool.Get((int)numBytesToWrite); - fixed (byte* bufferRaw = buffer) - { - Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size); - } + buffer.Slice(0, size).CopyTo(pbuffer.AsSpan()); if (writeCheckpointSemaphore == null) writeCheckpointSemaphore = new(0); if (segmentId == -1) - device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, IOCallback, null); + device.WriteAsync((IntPtr)pbuffer.Pointer, address, (uint)numBytesToWrite, IOCallback, null); else - device.WriteAsync((IntPtr)pbuffer.aligned_pointer, segmentId, address, (uint)numBytesToWrite, IOCallback, null); + device.WriteAsync((IntPtr)pbuffer.Pointer, segmentId, address, (uint)numBytesToWrite, IOCallback, null); writeCheckpointSemaphore.Wait(); pbuffer.Return(); diff --git a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs index 9fca3999fe5..82d95059fcd 100644 --- a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs +++ b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs @@ -95,7 +95,7 @@ internal sealed partial class ReplicationManager : IDisposable { ReplicationHistory currentReplicationConfig; readonly IDevice replicationConfigDevice; - readonly SectorAlignedBufferPool replicationConfigDevicePool; + readonly SectorAlignedMemoryPool replicationConfigDevicePool; private void InitializeReplicationHistory() { diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 09b592bab42..88f9eeaf095 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -247,7 +247,7 @@ public void Dispose() _disposed = true; replicationConfigDevice?.Dispose(); - replicationConfigDevicePool?.Free(); + replicationConfigDevicePool?.Dispose(); replicationSyncManager?.Dispose(); diff --git a/libs/common/StreamProvider.cs b/libs/common/StreamProvider.cs index ad3bf0d36c7..d9cb6dcc57d 100644 --- a/libs/common/StreamProvider.cs +++ b/libs/common/StreamProvider.cs @@ -36,7 +36,7 @@ public interface IStreamProvider /// /// Path to file /// Data to write - void Write(string path, byte[] data); + void Write(string path, ReadOnlySpan data); } /// @@ -49,57 +49,52 @@ internal abstract class StreamProviderBase : IStreamProvider public Stream Read(string path) { using var device = GetDevice(path); - var pool = new SectorAlignedBufferPool(1, (int)device.SectorSize); + using var pool = new SectorAlignedMemoryPool(1, (int)device.SectorSize); ReadInto(device, pool, 0, out var buffer, MaxConfigFileSizeAligned); - pool.Free(); // Remove trailing zeros - int lastIndex = Array.FindLastIndex(buffer, b => b != 0); + var lastIndex = buffer.AsSpan().LastIndexOfAnyExcept((byte)0); var stream = new MemoryStream(buffer, 0, lastIndex + 1); return stream; } - public unsafe void Write(string path, byte[] data) + public unsafe void Write(string path, ReadOnlySpan data) { using var device = GetDevice(path); - var bytesToWrite = GetBytesToWrite(data, device); - var pool = new SectorAlignedBufferPool(1, (int)device.SectorSize); + var bytesToWrite = GetBytesToWrite(data.Length, device); + var pool = new SectorAlignedMemoryPool(1, (int)device.SectorSize); // Get a sector-aligned buffer from the pool and copy _buffer into it. var buffer = pool.Get((int)bytesToWrite); - fixed (byte* bufferRaw = data) - { - Buffer.MemoryCopy(bufferRaw, buffer.aligned_pointer, data.Length, data.Length); - } + data.CopyTo(buffer.AsSpan()); // Write to the device and wait for the device to signal the semaphore that the write is complete. using var semaphore = new SemaphoreSlim(0); - device.WriteAsync((IntPtr)buffer.aligned_pointer, 0, (uint)bytesToWrite, IOCallback, semaphore); + device.WriteAsync((IntPtr)buffer.Pointer, 0, (uint)bytesToWrite, IOCallback, semaphore); semaphore.Wait(); // Free the sector-aligned buffer buffer.Return(); - pool.Free(); + pool.Dispose(); } protected abstract IDevice GetDevice(string path); - protected abstract long GetBytesToWrite(byte[] bytes, IDevice device); + protected abstract long GetBytesToWrite(int byteCount, IDevice device); - protected static unsafe void ReadInto(IDevice device, SectorAlignedBufferPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null) + protected static unsafe void ReadInto(IDevice device, SectorAlignedMemoryPool pool, ulong address, out byte[] buffer, int size, ILogger logger = null) { - using var semaphore = new SemaphoreSlim(0); long numBytesToRead = size; - numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); + numBytesToRead = (numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1); var pbuffer = pool.Get((int)numBytesToRead); - device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, - (uint)numBytesToRead, IOCallback, semaphore); + + using var semaphore = new SemaphoreSlim(0); + device.ReadAsync(address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, semaphore); semaphore.Wait(); buffer = new byte[numBytesToRead]; - fixed (byte* bufferRaw = buffer) - Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead); + pbuffer.AsSpan().CopyTo(buffer); pbuffer.Return(); } @@ -168,10 +163,10 @@ protected override IDevice GetDevice(string path) return settingsDevice; } - protected override long GetBytesToWrite(byte[] bytes, IDevice device) + protected override long GetBytesToWrite(int byteCount, IDevice device) { - long numBytesToWrite = bytes.Length; - numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); + // Round up to device sector size + var numBytesToWrite = (byteCount + (device.SectorSize - 1)) & ~(device.SectorSize - 1); if (numBytesToWrite > MaxConfigFileSizeAligned) throw new Exception($"Config file size {numBytesToWrite} is larger than the maximum allowed size {MaxConfigFileSizeAligned}"); return numBytesToWrite; @@ -202,10 +197,7 @@ protected override IDevice GetDevice(string path) return settingsDevice; } - protected override long GetBytesToWrite(byte[] bytes, IDevice device) - { - return bytes.Length; - } + protected override long GetBytesToWrite(int byteCount, IDevice device) => byteCount; } /// @@ -229,7 +221,7 @@ public Stream Read(string path) return assembly.GetManifestResourceStream(resourceName); } - public void Write(string path, byte[] data) + public void Write(string path, ReadOnlySpan data) { var resourceName = assembly.GetManifestResourceNames() .FirstOrDefault(rn => rn.EndsWith($".{path}")); @@ -237,7 +229,7 @@ public void Write(string path, byte[] data) using var stream = assembly.GetManifestResourceStream(resourceName); if (stream != null) - stream.Write(data, 0, data.Length); + stream.Write(data); } } } \ No newline at end of file diff --git a/libs/server/Storage/Session/MainStore/BitmapOps.cs b/libs/server/Storage/Session/MainStore/BitmapOps.cs index a3b71f43682..40aa56eea00 100644 --- a/libs/server/Storage/Session/MainStore/BitmapOps.cs +++ b/libs/server/Storage/Session/MainStore/BitmapOps.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Numerics; using System.Runtime.CompilerServices; using System.Text; using Garnet.common; @@ -136,29 +137,26 @@ public unsafe GarnetStatus StringBitOperation(ref RawStringInput input, BitmapOp minBitmapLen = Math.Min(len, minBitmapLen); } - #region performBitop - // Allocate result buffers - sectorAlignedMemoryBitmap ??= new SectorAlignedMemory(bitmapBufferSize + sectorAlignedMemoryPoolAlignment, sectorAlignedMemoryPoolAlignment); - var dstBitmapPtr = sectorAlignedMemoryBitmap.GetValidPointer() + sectorAlignedMemoryPoolAlignment; - if (maxBitmapLen + sectorAlignedMemoryPoolAlignment > bitmapBufferSize) + // Check if at least one key is found and execute bitop + if (keysFound > 0) { - do + // We need to store the bitmap length as 32-bit integer before the destination bitmap buffer. + // We request for additional alignment amount of space to keep the destination bitmap buffer aligned. + var requiredOutputLength = maxBitmapLen + sectorAlignedMemoryPoolAlignment; + + var bufferSize = (int)Math.Max(bitmapBufferSize, BitOperations.RoundUpToPowerOf2((uint)maxBitmapLen)); + if (sectorAlignedMemoryBitmap == null || requiredOutputLength > bitmapBufferSize) { - bitmapBufferSize <<= 1; - } while (maxBitmapLen + sectorAlignedMemoryPoolAlignment > bitmapBufferSize); + bitmapBufferSize = bufferSize; - sectorAlignedMemoryBitmap.Dispose(); - sectorAlignedMemoryBitmap = new SectorAlignedMemory(bitmapBufferSize + sectorAlignedMemoryPoolAlignment, sectorAlignedMemoryPoolAlignment); - dstBitmapPtr = sectorAlignedMemoryBitmap.GetValidPointer() + sectorAlignedMemoryPoolAlignment; - } + sectorAlignedMemoryBitmap?.Dispose(); + sectorAlignedMemoryBitmap = SectorAlignedMemory.Allocate(bitmapBufferSize, sectorAlignedMemoryPoolAlignment); + } + var dstBitmapPtr = sectorAlignedMemoryBitmap.GetValidPointer() + sectorAlignedMemoryPoolAlignment; - // Check if at least one key is found and execute bitop - if (keysFound > 0) - { //1. Multi-way bitmap merge _ = BitmapManager.BitOpMainUnsafeMultiKey(dstBitmapPtr, maxBitmapLen, srcBitmapStartPtrs, srcBitmapEndPtrs, keysFound, minBitmapLen, (byte)bitOp); - #endregion if (maxBitmapLen > 0) { diff --git a/libs/server/Storage/Session/MainStore/HyperLogLogOps.cs b/libs/server/Storage/Session/MainStore/HyperLogLogOps.cs index d649c802a32..599e073600e 100644 --- a/libs/server/Storage/Session/MainStore/HyperLogLogOps.cs +++ b/libs/server/Storage/Session/MainStore/HyperLogLogOps.cs @@ -116,14 +116,12 @@ public unsafe GarnetStatus HyperLogLogLength(ref RawStringInput input, try { - sectorAlignedMemoryHll1 ??= new SectorAlignedMemory(hllBufferSize + sectorAlignedMemoryPoolAlignment, - sectorAlignedMemoryPoolAlignment); - sectorAlignedMemoryHll2 ??= new SectorAlignedMemory(hllBufferSize + sectorAlignedMemoryPoolAlignment, - sectorAlignedMemoryPoolAlignment); + sectorAlignedMemoryHll1 ??= SectorAlignedMemory.Allocate(hllBufferSize, sectorAlignedMemoryPoolAlignment); + sectorAlignedMemoryHll2 ??= SectorAlignedMemory.Allocate(hllBufferSize, sectorAlignedMemoryPoolAlignment); var srcReadBuffer = sectorAlignedMemoryHll1.GetValidPointer(); var dstReadBuffer = sectorAlignedMemoryHll2.GetValidPointer(); - var dstMergeBuffer = new SpanByteAndMemory(srcReadBuffer, hllBufferSize); - var srcMergeBuffer = new SpanByteAndMemory(dstReadBuffer, hllBufferSize); + var dstMergeBuffer = SpanByteAndMemory.FromPinnedSpan(sectorAlignedMemoryHll1.AsValidSpan()); + var srcMergeBuffer = SpanByteAndMemory.FromPinnedSpan(sectorAlignedMemoryHll2.AsValidSpan()); var isFirst = false; for (var i = 0; i < input.parseState.Count; i++) @@ -146,24 +144,28 @@ public unsafe GarnetStatus HyperLogLogLength(ref RawStringInput input, var sbSrcHLL = srcMergeBuffer.SpanByte; var sbDstHLL = dstMergeBuffer.SpanByte; - var srcHLL = sbSrcHLL.ToPointer(); - var dstHLL = sbDstHLL.ToPointer(); + var srcHLLPtr = sbSrcHLL.ToPointer(); + var dstHLLPtr = sbDstHLL.ToPointer(); if (!isFirst) { isFirst = true; if (i == input.parseState.Count - 1) - count = HyperLogLog.DefaultHLL.Count(srcMergeBuffer.SpanByte.ToPointer()); + { + count = HyperLogLog.DefaultHLL.Count(sbSrcHLL.ToPointer()); + } else - Buffer.MemoryCopy(srcHLL, dstHLL, sbSrcHLL.Length, sbSrcHLL.Length); + { + Buffer.MemoryCopy(srcHLLPtr, dstHLLPtr, sbSrcHLL.Length, sbSrcHLL.Length); + } continue; } - HyperLogLog.DefaultHLL.TryMerge(srcHLL, dstHLL, sbDstHLL.Length); + HyperLogLog.DefaultHLL.TryMerge(srcHLLPtr, dstHLLPtr, sbDstHLL.Length); if (i == input.parseState.Count - 1) { - count = HyperLogLog.DefaultHLL.Count(dstHLL); + count = HyperLogLog.DefaultHLL.Count(dstHLLPtr); } } } @@ -209,8 +211,8 @@ public unsafe GarnetStatus HyperLogLogMerge(ref RawStringInput input, out bool e try { - sectorAlignedMemoryHll1 ??= new SectorAlignedMemory(hllBufferSize + sectorAlignedMemoryPoolAlignment, sectorAlignedMemoryPoolAlignment); - var readBuffer = sectorAlignedMemoryHll1.GetValidPointer(); + sectorAlignedMemoryHll1 ??= SectorAlignedMemory.Allocate(hllBufferSize, sectorAlignedMemoryPoolAlignment); + var readBufferPtr = sectorAlignedMemoryHll1.GetValidPointer(); var dstKey = input.parseState.GetArgSliceByRef(0).SpanByte; @@ -220,7 +222,7 @@ public unsafe GarnetStatus HyperLogLogMerge(ref RawStringInput input, out bool e var currInput = new RawStringInput(RespCommand.PFMERGE); - var mergeBuffer = new SpanByteAndMemory(readBuffer, hllBufferSize); + var mergeBuffer = SpanByteAndMemory.FromPinnedSpan(sectorAlignedMemoryHll1.AsValidSpan()); var srcKey = input.parseState.GetArgSliceByRef(i).SpanByte; var status = GET(ref srcKey, ref currInput, ref mergeBuffer, ref currLockableContext); @@ -228,7 +230,7 @@ public unsafe GarnetStatus HyperLogLogMerge(ref RawStringInput input, out bool e if (status == GarnetStatus.NOTFOUND) continue; // Invalid Type - if (*(long*)readBuffer == -1) + if (*(long*)readBufferPtr == -1) { error = true; break; diff --git a/libs/server/Storage/Session/StorageSession.cs b/libs/server/Storage/Session/StorageSession.cs index 204696448f1..a4ae2638c22 100644 --- a/libs/server/Storage/Session/StorageSession.cs +++ b/libs/server/Storage/Session/StorageSession.cs @@ -31,7 +31,7 @@ sealed partial class StorageSession : IDisposable SectorAlignedMemory sectorAlignedMemoryHll1; SectorAlignedMemory sectorAlignedMemoryHll2; readonly int hllBufferSize = HyperLogLog.DefaultHLL.DenseBytes; - readonly int sectorAlignedMemoryPoolAlignment = 32; + readonly uint sectorAlignedMemoryPoolAlignment = 32; internal SessionParseState parseState; diff --git a/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs index 519c32c55cb..35bc354d478 100644 --- a/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs +++ b/libs/storage/Tsavorite/cs/benchmark/BDN-Tsavorite.Benchmark/IterationTests.cs @@ -74,10 +74,10 @@ public void SetupPopulatedStore() [GlobalCleanup] public void TearDown() { - store?.Dispose(); - store = null; logDevice?.Dispose(); logDevice = null; + store?.Dispose(); + store = null; try { Directory.Delete(logDirectory); diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index fdc7a946daf..fd053d6a5b7 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -149,7 +149,7 @@ public override string ToString() #region Contained classes and related /// Buffer pool - internal SectorAlignedBufferPool bufferPool; + internal SectorAlignedMemoryPool bufferPool; /// This hlog is an instance of a Read cache protected readonly bool IsReadCache = false; @@ -375,7 +375,7 @@ public virtual void Dispose() if (ownedEpoch) epoch.Dispose(); - bufferPool.Free(); + bufferPool.Dispose(); FlushEvent.Dispose(); notifyFlushedUntilAddressSemaphore?.Dispose(); @@ -660,7 +660,7 @@ protected void Initialize(long firstValidAddress) { Debug.Assert(firstValidAddress <= PageSize, $"firstValidAddress {firstValidAddress} shoulld be <= PageSize {PageSize}"); - bufferPool ??= new SectorAlignedBufferPool(1, sectorSize); + bufferPool ??= new SectorAlignedMemoryPool(recordSize: 1, sectorSize); if (BufferSize > 0) { @@ -1412,15 +1412,15 @@ internal unsafe void AsyncReadRecordToMemory(long fromLogical, int numBytes, Dev alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); var record = bufferPool.Get((int)alignedReadLength); - record.valid_offset = (int)(fileOffset - alignedFileOffset); - record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); - record.required_bytes = numBytes; + record.ValidOffset = (int)(fileOffset - alignedFileOffset); + record.AvailableBytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); + record.RequiredBytes = numBytes; var asyncResult = default(AsyncGetFromDiskResult>); asyncResult.context = context; asyncResult.context.record = record; device.ReadAsync(alignedFileOffset, - (IntPtr)asyncResult.context.record.aligned_pointer, + (IntPtr)asyncResult.context.record.Pointer, alignedReadLength, callback, asyncResult); @@ -1442,12 +1442,12 @@ internal unsafe void AsyncReadRecordToMemory(long fromLogical, int numBytes, Dev alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); context.record = bufferPool.Get((int)alignedReadLength); - context.record.valid_offset = (int)(fileOffset - alignedFileOffset); - context.record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); - context.record.required_bytes = numBytes; + context.record.ValidOffset = (int)(fileOffset - alignedFileOffset); + context.record.AvailableBytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); + context.record.RequiredBytes = numBytes; device.ReadAsync(alignedFileOffset, - (IntPtr)context.record.aligned_pointer, + (IntPtr)context.record.Pointer, alignedReadLength, callback, context); @@ -1718,8 +1718,8 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje try { var record = ctx.record.GetValidPointer(); - int requiredBytes = _wrapper.GetRequiredRecordSize((long)record, ctx.record.available_bytes); - if (ctx.record.available_bytes >= requiredBytes) + int requiredBytes = _wrapper.GetRequiredRecordSize((long)record, ctx.record.AvailableBytes); + if (ctx.record.AvailableBytes >= requiredBytes) { Debug.Assert(!_wrapper.GetInfoFromBytePointer(record).Invalid, "Invalid records should not be in the hash chain for pending IO"); diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocatorImpl.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocatorImpl.cs index d60df042f4d..b8d081d3af4 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocatorImpl.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableAllocatorImpl.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading; namespace Tsavorite.core @@ -11,10 +12,11 @@ namespace Tsavorite.core internal sealed unsafe class BlittableAllocatorImpl : AllocatorBase> where TStoreFunctions : IStoreFunctions { - // Circular buffer definition - private readonly byte[][] values; - private readonly long[] pointers; - private readonly long* nativePointers; + /// + /// Circular buffer of memory buffers + /// with each memory buffer being size of + /// + private readonly byte** pointers; private static int KeySize => Unsafe.SizeOf(); private static int ValueSize => Unsafe.SizeOf(); @@ -28,13 +30,15 @@ public BlittableAllocatorImpl(AllocatorSettings settings, TStoreFunctions storeF if (!Utility.IsBlittable() || !Utility.IsBlittable()) throw new TsavoriteException($"BlittableAllocator requires blittlable Key ({typeof(TKey)}) and Value ({typeof(TValue)})"); - overflowPagePool = new OverflowPool(4, p => { }); + overflowPagePool = new OverflowPool(4, static p => + { + NativeMemory.AlignedFree(p.Pointer); + GC.RemoveMemoryPressure(p.Size); + }); if (BufferSize > 0) { - values = new byte[BufferSize][]; - pointers = GC.AllocateArray(BufferSize, true); - nativePointers = (long*)Unsafe.AsPointer(ref pointers[0]); + pointers = (byte**)NativeMemory.AllocZeroed((uint)BufferSize, (uint)sizeof(byte*)); } } @@ -52,15 +56,16 @@ public override void Reset() void ReturnPage(int index) { Debug.Assert(index < BufferSize); - if (values[index] != null) + var pagePtr = pointers[index]; + if (pagePtr != null) { _ = overflowPagePool.TryAdd(new PageUnit { - pointer = pointers[index], - value = values[index] + Pointer = pagePtr, + Size = PageSize }); - values[index] = null; - pointers[index] = 0; + pointers[index] = null; + _ = Interlocked.Decrement(ref AllocatedPageCount); } } @@ -119,6 +124,7 @@ public override void Dispose() { base.Dispose(); overflowPagePool.Dispose(); + DeleteFromMemory(); } /// @@ -131,17 +137,13 @@ internal void AllocatePage(int index) if (overflowPagePool.TryGet(out var item)) { - pointers[index] = item.pointer; - values[index] = item.value; + pointers[index] = item.Pointer; return; } - var adjustedSize = PageSize + 2 * sectorSize; - - byte[] tmp = GC.AllocateArray(adjustedSize, true); - long p = (long)Unsafe.AsPointer(ref tmp[0]); - pointers[index] = (p + (sectorSize - 1)) & ~((long)sectorSize - 1); - values[index] = tmp; + pointers[index] = (byte*)NativeMemory.AlignedAlloc((uint)PageSize, alignment: (uint)sectorSize); + GC.AddMemoryPressure(PageSize); + ClearPage(index, 0); } internal int OverflowPageCount => overflowPagePool.Count; @@ -154,10 +156,12 @@ public long GetPhysicalAddress(long logicalAddress) // Index of page within the circular buffer var pageIndex = (int)((logicalAddress >> LogPageSizeBits) & (BufferSize - 1)); - return *(nativePointers + pageIndex) + offset; + + Debug.Assert(IsAllocated(pageIndex)); + return (long)(pointers[pageIndex] + offset); } - internal bool IsAllocated(int pageIndex) => values[pageIndex] != null; + internal bool IsAllocated(int pageIndex) => pointers[pageIndex] != null; protected override void WriteAsync(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult asyncResult) { @@ -198,21 +202,20 @@ public long GetFirstValidLogicalAddress(long page) internal void ClearPage(long page, int offset) { - if (offset == 0) - Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); - else - { - // Adjust array offset for cache alignment - offset += (int)(pointers[page % BufferSize] - (long)Unsafe.AsPointer(ref values[page % BufferSize][0])); - Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); - } + Debug.Assert(offset < PageSize); + Debug.Assert(IsAllocated(GetPageIndexForPage(page))); + + var ptr = pointers[page % BufferSize] + offset; + var length = (uint)(PageSize - offset); + + NativeMemory.Clear(ptr, length); } internal void FreePage(long page) { ClearPage(page, 0); if (EmptyPageCount > 0) - ReturnPage((int)(page % BufferSize)); + ReturnPage(GetPageIndexForPage(page)); } /// @@ -220,8 +223,16 @@ internal void FreePage(long page) /// internal override void DeleteFromMemory() { - for (int i = 0; i < values.Length; i++) - values[i] = null; + for (var i = 0; i < BufferSize; i++) + { + var pagePtr = pointers[i]; + if (pagePtr != null) + { + NativeMemory.AlignedFree(pagePtr); + GC.RemoveMemoryPressure(PageSize); + pointers[i] = null; + } + } } protected override void ReadAsync(ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length, @@ -317,7 +328,7 @@ internal void AsyncReadPagesFromDeviceToFrame( for (long readPage = readPageStart; readPage < (readPageStart + numPages); readPage++) { int pageIndex = (int)(readPage % frame.frameSize); - if (frame.frame[pageIndex] == null) + if (!frame.IsAllocated(pageIndex)) frame.Allocate(pageIndex); else frame.Clear(pageIndex); diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableFrame.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableFrame.cs index 38e72056ae6..6087b363380 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableFrame.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableFrame.cs @@ -2,18 +2,18 @@ // Licensed under the MIT license. using System; -using System.Runtime.CompilerServices; +using System.Diagnostics; +using System.Runtime.InteropServices; namespace Tsavorite.core { /// /// A frame is an in-memory circular buffer of log pages /// - internal sealed class BlittableFrame : IDisposable + internal sealed unsafe class BlittableFrame : IDisposable { public readonly int frameSize, pageSize, sectorSize; - public readonly byte[][] frame; - public readonly long[] pointers; + public readonly byte** pointers; public BlittableFrame(int frameSize, int pageSize, int sectorSize) { @@ -21,32 +21,47 @@ public BlittableFrame(int frameSize, int pageSize, int sectorSize) this.pageSize = pageSize; this.sectorSize = sectorSize; - frame = new byte[frameSize][]; - pointers = new long[frameSize]; + pointers = (byte**)NativeMemory.AllocZeroed((uint)frameSize, (uint)sizeof(byte*)); } - public unsafe void Allocate(int index) + public bool IsAllocated(int pageIndex) => pointers[pageIndex] != null; + + public void Allocate(int pageIndex) { - var adjustedSize = pageSize + 2 * sectorSize; + Debug.Assert(pageIndex < frameSize); + Debug.Assert(!IsAllocated(pageIndex)); - byte[] tmp = GC.AllocateArray(adjustedSize, true); - long p = (long)Unsafe.AsPointer(ref tmp[0]); - pointers[index] = (p + (sectorSize - 1)) & ~((long)sectorSize - 1); - frame[index] = tmp; + pointers[pageIndex] = (byte*)NativeMemory.AlignedAlloc((uint)(pageSize + sectorSize), alignment: (uint)sectorSize); // TODO: Over allocation for diagnostics, fix and remove. + NativeMemory.Clear(pointers[pageIndex], (uint)(pageSize + sectorSize)); // TODO: Over allocation for diagnostics, fix and remove. + GC.AddMemoryPressure(pageSize); + Clear(pageIndex); } public void Clear(int pageIndex) { - Array.Clear(frame[pageIndex], 0, frame[pageIndex].Length); + Debug.Assert(pageIndex < frameSize); + Debug.Assert(IsAllocated(pageIndex)); + + NativeMemory.Clear(pointers[pageIndex], (uint)frameSize); } public long GetPhysicalAddress(long frameNumber, long offset) { - return pointers[frameNumber % frameSize] + offset; + return (long)pointers[frameNumber % frameSize] + offset; } public void Dispose() { + for (var i = 0; i < frameSize; i++) + { + var pagePtr = pointers[i]; + if (pagePtr != null) + { + NativeMemory.AlignedFree(pagePtr); + GC.RemoveMemoryPressure(frameSize); + pointers[i] = null; + } + } } } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableScanIterator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableScanIterator.cs index 51c094f22ad..331bde06cb9 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableScanIterator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/BlittableScanIterator.cs @@ -270,7 +270,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object if (result.freeBuffer1 != null) { - BlittableAllocatorImpl.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + BlittableAllocatorImpl.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.RequiredBytes, result.page); result.freeBuffer1.Return(); result.freeBuffer1 = null; } diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocatorImpl.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocatorImpl.cs index faf50f1aa4f..3d10cc60eea 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocatorImpl.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericAllocatorImpl.cs @@ -166,21 +166,18 @@ internal override bool TryComplete() /// public override void Dispose() { - if (values != null) - { - for (int i = 0; i < values.Length; i++) - values[i] = null; - values = null; - } - overflowPagePool.Dispose(); base.Dispose(); + overflowPagePool.Dispose(); + DeleteFromMemory(); } /// Delete in-memory portion of the log internal override void DeleteFromMemory() { - for (int i = 0; i < values.Length; i++) + for (int i = 0; i < values?.Length; i++) + { values[i] = null; + } values = null; } @@ -350,16 +347,16 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres { handle = new CountdownEvent(1) }; - device.ReadAsync(alignedDestinationAddress + (ulong)aligned_start, (IntPtr)buffer.aligned_pointer + aligned_start, + device.ReadAsync(alignedDestinationAddress + (ulong)aligned_start, (IntPtr)buffer.Pointer + aligned_start, (uint)sectorSize, AsyncReadPageCallback, result); result.handle.Wait(); } fixed (RecordInfo* pin = &src[0].info) { // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. - Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); + Debug.Assert(numBytesToWrite <= buffer.Length); - Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.aligned_pointer + start, + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + start), buffer.Pointer + start, numBytesToWrite - start, numBytesToWrite - start); } } @@ -368,9 +365,9 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres fixed (RecordInfo* pin = &src[0].info) { // Write all the RecordInfos on one operation. This also includes object pointers, but for valid records we will overwrite those below. - Debug.Assert(buffer.aligned_pointer + numBytesToWrite <= (byte*)Unsafe.AsPointer(ref buffer.buffer[0]) + buffer.buffer.Length); + Debug.Assert(numBytesToWrite <= buffer.Length); - Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.aligned_pointer + aligned_start, + Buffer.MemoryCopy((void*)((long)Unsafe.AsPointer(ref src[0]) + aligned_start), buffer.Pointer + aligned_start, numBytesToWrite - aligned_start, numBytesToWrite - aligned_start); } } @@ -391,7 +388,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres for (int i = start / RecordSize; i < end / RecordSize; i++) { - byte* recordPtr = buffer.aligned_pointer + i * RecordSize; + byte* recordPtr = buffer.Pointer + i * RecordSize; // Retrieve reference to record struct ref var record = ref Unsafe.AsRef>(recordPtr); @@ -475,7 +472,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres _objBuffer = bufferPool.Get(memoryStreamTotalLength); fixed (void* src_ = ms.GetBuffer()) - Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamTotalLength, memoryStreamActualLength); + Buffer.MemoryCopy(src_, _objBuffer.Pointer, memoryStreamTotalLength, memoryStreamActualLength); } // Each address we calculated above is now an offset to objAddr; convert to the actual address. @@ -499,7 +496,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres asyncResult.done = new AutoResetEvent(false); Debug.Assert(memoryStreamTotalLength > 0); objlogDevice.WriteAsync( - (IntPtr)_objBuffer.aligned_pointer, + (IntPtr)_objBuffer.Pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), (ulong)_objAddr, (uint)_alignedLength, AsyncFlushPartialObjectLogCallback, asyncResult); @@ -517,7 +514,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres asyncResult.freeBuffer2 = _objBuffer; objlogDevice.WriteAsync( - (IntPtr)_objBuffer.aligned_pointer, + (IntPtr)_objBuffer.Pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), (ulong)_objAddr, (uint)_alignedLength, callback, asyncResult); } @@ -537,7 +534,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres var alignedNumBytesToWrite = (uint)((numBytesToWrite + (sectorSize - 1)) & ~(sectorSize - 1)); // Finally write the hlog page - device.WriteAsync((IntPtr)buffer.aligned_pointer + aligned_start, alignedDestinationAddress + (ulong)aligned_start, + device.WriteAsync((IntPtr)buffer.Pointer + aligned_start, alignedDestinationAddress + (ulong)aligned_start, alignedNumBytesToWrite, callback, asyncResult); } finally @@ -562,11 +559,11 @@ protected override void ReadAsync( DeviceIOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice) { asyncResult.freeBuffer1 = bufferPool.Get((int)aligned_read_length); - asyncResult.freeBuffer1.required_bytes = (int)aligned_read_length; + asyncResult.freeBuffer1.RequiredBytes = (int)aligned_read_length; if (!(KeyHasObjects() || ValueHasObjects())) { - device.ReadAsync(alignedSourceAddress, (IntPtr)asyncResult.freeBuffer1.aligned_pointer, + device.ReadAsync(alignedSourceAddress, (IntPtr)asyncResult.freeBuffer1.Pointer, aligned_read_length, callback, asyncResult); return; } @@ -580,7 +577,7 @@ protected override void ReadAsync( } asyncResult.objlogDevice = objlogDevice; - device.ReadAsync(alignedSourceAddress, (IntPtr)asyncResult.freeBuffer1.aligned_pointer, + device.ReadAsync(alignedSourceAddress, (IntPtr)asyncResult.freeBuffer1.Pointer, aligned_read_length, AsyncReadPageWithObjectsCallback, asyncResult); } @@ -623,8 +620,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num // Deserialize all objects until untilptr if (result.resumePtr < result.untilPtr) { - MemoryStream ms = new(result.freeBuffer2.buffer); - ms.Seek(result.freeBuffer2.offset, SeekOrigin.Begin); + UnmanagedMemoryStream ms = new(result.freeBuffer2.Pointer, result.freeBuffer2.Length); Deserialize(result.freeBuffer1.GetValidPointer(), result.resumePtr, result.untilPtr, src, ms); ms.Dispose(); @@ -660,7 +656,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num result.objlogDevice.ReadAsync( (int)((result.page - result.offset) >> (LogSegmentSizeBits - LogPageSizeBits)), (ulong)startptr, - (IntPtr)objBuffer.aligned_pointer, (uint)alignedLength, AsyncReadPageWithObjectsCallback, result); + (IntPtr)objBuffer.Pointer, (uint)alignedLength, AsyncReadPageWithObjectsCallback, result); } /// @@ -681,9 +677,9 @@ protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int num alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); var record = bufferPool.Get((int)alignedReadLength); - record.valid_offset = (int)(fileOffset - alignedFileOffset); - record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); - record.required_bytes = numBytes; + record.ValidOffset = (int)(fileOffset - alignedFileOffset); + record.AvailableBytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); + record.RequiredBytes = numBytes; var asyncResult = default(AsyncGetFromDiskResult>); asyncResult.context = context; @@ -692,7 +688,7 @@ protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int num objectLogDevice.ReadAsync( (int)(context.logicalAddress >> LogSegmentSizeBits), alignedFileOffset, - (IntPtr)asyncResult.context.objBuffer.aligned_pointer, + (IntPtr)asyncResult.context.objBuffer.Pointer, alignedReadLength, callback, asyncResult); @@ -946,8 +942,8 @@ internal bool RetrievedFullRecord(byte* record, ref AsyncIOContext } // Parse the key and value objects - var ms = new MemoryStream(ctx.objBuffer.buffer); - _ = ms.Seek(ctx.objBuffer.offset + ctx.objBuffer.valid_offset, SeekOrigin.Begin); + var ms = new UnmanagedMemoryStream(ctx.objBuffer.Pointer, ctx.objBuffer.Length); + _ = ms.Seek(ctx.objBuffer.ValidOffset, SeekOrigin.Begin); if (KeyHasObjects()) { diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericScanIterator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericScanIterator.cs index 44fb51c3a47..997de69ea9f 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/GenericScanIterator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/GenericScanIterator.cs @@ -280,7 +280,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object if (result.freeBuffer1 != null) { - hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, ref frame.GetPage(result.page % frame.frameSize)); + hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.RequiredBytes, ref frame.GetPage(result.page % frame.frameSize)); result.freeBuffer1.Return(); result.freeBuffer1 = null; } diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/MallocFixedPageSize.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/MallocFixedPageSize.cs index 153cd3f257b..857d896c8d0 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -357,7 +357,7 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num } else { - result.mem = new SectorAlignedMemory((int)writeSize, (int)device.SectorSize); + result.mem = SectorAlignedMemory.Allocate((int)writeSize, device.SectorSize); bool prot = false; if (!epoch.ThisInstanceProtected()) { @@ -365,17 +365,17 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num epoch.Resume(); } - Buffer.MemoryCopy((void*)pointers[i], result.mem.aligned_pointer, writeSize, writeSize); + Buffer.MemoryCopy((void*)pointers[i], result.mem.Pointer, writeSize, writeSize); int j = 0; if (i == 0) j += AllocateChunkSize * RecordSize; for (; j < writeSize; j += sizeof(HashBucket)) { - skipReadCache((HashBucket*)(result.mem.aligned_pointer + j)); + skipReadCache((HashBucket*)(result.mem.Pointer + j)); } if (prot) epoch.Suspend(); - device.WriteAsync((IntPtr)result.mem.aligned_pointer, offset + numBytesWritten, writeSize, AsyncFlushCallback, result); + device.WriteAsync((IntPtr)result.mem.Pointer, offset + numBytesWritten, writeSize, AsyncFlushCallback, result); } numBytesWritten += writeSize; } diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/PageUnit.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/PageUnit.cs index 5d093c3fd55..bc329a9d103 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/PageUnit.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/PageUnit.cs @@ -5,10 +5,10 @@ namespace Tsavorite.core { - struct PageUnit + unsafe struct PageUnit { - public byte[] value; - public long pointer; + public byte* Pointer; + public int Size; } [StructLayout(LayoutKind.Explicit)] diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs index f01495c1fdb..8c7a096355c 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading; using static Tsavorite.core.Utility; @@ -13,23 +14,26 @@ namespace Tsavorite.core internal sealed unsafe class SpanByteAllocatorImpl : AllocatorBase> where TStoreFunctions : IStoreFunctions { - // Circular buffer definition - private readonly byte[][] values; - private readonly long[] pointers; - private readonly long* nativePointers; + /// + /// Circular buffer of memory buffers + /// with each byte being size of + /// + private readonly byte** pointers; private readonly OverflowPool overflowPagePool; public SpanByteAllocatorImpl(AllocatorSettings settings, TStoreFunctions storeFunctions, Func> wrapperCreator) : base(settings.LogSettings, storeFunctions, wrapperCreator, settings.evictCallback, settings.epoch, settings.flushCallback, settings.logger) { - overflowPagePool = new OverflowPool(4, p => { }); + overflowPagePool = new OverflowPool(4, static p => + { + NativeMemory.AlignedFree(p.Pointer); + GC.RemoveMemoryPressure(p.Size); + }); if (BufferSize > 0) { - values = new byte[BufferSize][]; - pointers = GC.AllocateArray(BufferSize, true); - nativePointers = (long*)Unsafe.AsPointer(ref pointers[0]); + pointers = (byte**)NativeMemory.AllocZeroed((uint)BufferSize, (uint)sizeof(byte*)); } } @@ -49,15 +53,16 @@ public override void Reset() void ReturnPage(int index) { Debug.Assert(index < BufferSize); - if (values[index] != null) + var pagePtr = pointers[index]; + if (pagePtr != null) { overflowPagePool.TryAdd(new PageUnit { - pointer = pointers[index], - value = values[index] + Pointer = pagePtr, + Size = PageSize }); - values[index] = null; - pointers[index] = 0; + pointers[index] = null; + Interlocked.Decrement(ref AllocatedPageCount); } } @@ -201,6 +206,7 @@ public override void Dispose() { base.Dispose(); overflowPagePool.Dispose(); + DeleteFromMemory(); } /// @@ -213,17 +219,13 @@ internal void AllocatePage(int index) if (overflowPagePool.TryGet(out var item)) { - pointers[index] = item.pointer; - values[index] = item.value; + pointers[index] = item.Pointer; return; } - var adjustedSize = PageSize + 2 * sectorSize; - - byte[] tmp = GC.AllocateArray(adjustedSize, true); - long p = (long)Unsafe.AsPointer(ref tmp[0]); - pointers[index] = (p + (sectorSize - 1)) & ~((long)sectorSize - 1); - values[index] = tmp; + pointers[index] = (byte*)NativeMemory.AlignedAlloc((uint)PageSize, alignment: (uint)sectorSize); + GC.AddMemoryPressure(PageSize); + ClearPage(index, 0); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -234,10 +236,12 @@ public long GetPhysicalAddress(long logicalAddress) // Index of page within the circular buffer var pageIndex = (int)((logicalAddress >> LogPageSizeBits) & (BufferSize - 1)); - return *(nativePointers + pageIndex) + offset; + + Debug.Assert(IsAllocated(pageIndex)); + return (long)(pointers[pageIndex] + offset); } - internal bool IsAllocated(int pageIndex) => values[pageIndex] != null; + internal bool IsAllocated(int pageIndex) => pointers[pageIndex] != null; protected override void WriteAsync(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult asyncResult) { @@ -272,21 +276,20 @@ public long GetFirstValidLogicalAddress(long page) internal void ClearPage(long page, int offset) { - if (offset == 0) - Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); - else - { - // Adjust array offset for cache alignment - offset += (int)(pointers[page % BufferSize] - (long)Unsafe.AsPointer(ref values[page % BufferSize][0])); - Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); - } + Debug.Assert(offset < PageSize); + Debug.Assert(IsAllocated(GetPageIndexForPage(page))); + + var ptr = pointers[page % BufferSize] + offset; + var length = (uint)(PageSize - offset); + + NativeMemory.Clear(ptr, length); } internal void FreePage(long page) { ClearPage(page, 0); if (EmptyPageCount > 0) - ReturnPage((int)(page % BufferSize)); + ReturnPage(GetPageIndexForPage(page)); } /// @@ -294,8 +297,16 @@ internal void FreePage(long page) /// internal override void DeleteFromMemory() { - for (int i = 0; i < values.Length; i++) - values[i] = null; + for (var i = 0; i < BufferSize; i++) + { + var pagePtr = pointers[i]; + if (pagePtr != null) + { + NativeMemory.AlignedFree(pagePtr); + GC.RemoveMemoryPressure(PageSize); + pointers[i] = null; + } + } } protected override void ReadAsync( @@ -417,7 +428,7 @@ internal void AsyncReadPagesFromDeviceToFrame( for (long readPage = readPageStart; readPage < (readPageStart + numPages); readPage++) { int pageIndex = (int)(readPage % frame.frameSize); - if (frame.frame[pageIndex] == null) + if (!frame.IsAllocated(pageIndex)) { frame.Allocate(pageIndex); } diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteScanIterator.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteScanIterator.cs index 3d75a0351af..49eff23dff4 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteScanIterator.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteScanIterator.cs @@ -193,7 +193,7 @@ public unsafe bool GetNext(out RecordInfo recordInfo) } else { - if (memory.AlignedTotalCapacity < recordSize) + if (memory.Length < recordSize) { memory.Return(); memory = hlog.bufferPool.Get(recordSize); @@ -206,8 +206,8 @@ public unsafe bool GetNext(out RecordInfo recordInfo) unsafe { - Buffer.MemoryCopy((byte*)currentPhysicalAddress, memory.aligned_pointer, recordSize, recordSize); - currentPhysicalAddress = (long)memory.aligned_pointer; + Buffer.MemoryCopy((byte*)currentPhysicalAddress, memory.Pointer, recordSize, recordSize); + currentPhysicalAddress = (long)memory.Pointer; } } finally @@ -316,7 +316,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object if (result.freeBuffer1 != null) { - hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.RequiredBytes, result.page); result.freeBuffer1.Return(); result.freeBuffer1 = null; } diff --git a/libs/storage/Tsavorite/cs/src/core/Device/ManagedLocalStorageDevice.cs b/libs/storage/Tsavorite/cs/src/core/Device/ManagedLocalStorageDevice.cs index 2c64ef2a85c..b36def5b388 100644 --- a/libs/storage/Tsavorite/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/libs/storage/Tsavorite/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -22,7 +22,7 @@ public sealed class ManagedLocalStorageDevice : StorageDeviceBase private readonly bool osReadBuffering; private readonly bool readOnly; private readonly SafeConcurrentDictionary, AsyncPool)> logHandles; - private readonly SectorAlignedBufferPool pool; + private readonly SectorAlignedMemoryPool pool; /// /// Number of pending reads on device @@ -45,7 +45,7 @@ public sealed class ManagedLocalStorageDevice : StorageDeviceBase public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false, bool osReadBuffering = false, bool readOnly = false) : base(filename, GetSectorSize(filename), capacity) { - pool = new(1, 1); + pool = new(recordSize: 1, sectorSize: 1); ThrottleLimit = 120; string path = new FileInfo(filename).Directory.FullName; @@ -386,7 +386,7 @@ public override void Dispose() if (deleteOnClose) File.Delete(GetSegmentName(entry.Key)); } - pool.Free(); + pool.Dispose(); } private string GetSegmentName(int segmentId) => GetSegmentFilename(FileName, segmentId); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index 9f49578cd60..723ef144cdf 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -32,7 +32,7 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa private readonly SemaphoreSlim semaphore; private readonly bool removeOutdated; - private SectorAlignedBufferPool bufferPool; + private SectorAlignedMemoryPool bufferPool; /// /// Track historical commits for automatic purging @@ -455,19 +455,19 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context) protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size) { if (bufferPool == null) - bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize); + bufferPool = new SectorAlignedMemoryPool(recordSize: 1, (int)device.SectorSize); long numBytesToRead = size; numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); var pbuffer = bufferPool.Get((int)numBytesToRead); - device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, + device.ReadAsync(address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null); semaphore.Wait(); buffer = new byte[numBytesToRead]; fixed (byte* bufferRaw = buffer) - Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead); + Buffer.MemoryCopy(pbuffer.Pointer, bufferRaw, numBytesToRead, numBytesToRead); pbuffer.Return(); } @@ -481,7 +481,7 @@ protected unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size) { if (bufferPool == null) - bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize); + bufferPool = new SectorAlignedMemoryPool(recordSize: 1, (int)device.SectorSize); long numBytesToWrite = size; numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); @@ -489,10 +489,10 @@ protected unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, in var pbuffer = bufferPool.Get((int)numBytesToWrite); fixed (byte* bufferRaw = buffer) { - Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size); + Buffer.MemoryCopy(bufferRaw, pbuffer.Pointer, size, size); } - device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, IOCallback, null); + device.WriteAsync((IntPtr)pbuffer.Pointer, address, (uint)numBytesToWrite, IOCallback, null); semaphore.Wait(); pbuffer.Return(); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/DeltaLog.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/DeltaLog.cs index 9890d791ad2..eb101d9192b 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/DeltaLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/DeltaLog.cs @@ -58,7 +58,7 @@ public sealed class DeltaLog : ScanIteratorBase, IDisposable bool disposed = false; // Fields to support writes - SectorAlignedBufferPool memory; + SectorAlignedMemoryPool memory; long tailAddress; long flushedUntilAddress; @@ -117,14 +117,14 @@ public override void Dispose() } } - internal override void AsyncReadPagesFromDeviceToFrame(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null) + internal override unsafe void AsyncReadPagesFromDeviceToFrame(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null) { IDevice usedDevice = deltaLogDevice; completed = new CountdownEvent(numPages); for (long readPage = readPageStart; readPage < (readPageStart + numPages); readPage++) { int pageIndex = (int)(readPage % frame.frameSize); - if (frame.frame[pageIndex] == null) + if (!frame.IsAllocated(pageIndex)) { frame.Allocate(pageIndex); } @@ -300,7 +300,7 @@ private static unsafe void SetBlockHeader(int length, DeltaLogEntryType type, by /// Initialize for writes /// /// - public void InitializeForWrites(SectorAlignedBufferPool memory) + public void InitializeForWrites(SectorAlignedMemoryPool memory) { this.memory = memory; buffer = memory.Get(PageSize); @@ -317,7 +317,7 @@ public unsafe void Allocate(out int maxEntryLength, out long physicalAddress) long dataStartAddress = tailAddress + HeaderSize; maxEntryLength = (int)(pageEndAddress - dataStartAddress); int offset = (int)(dataStartAddress & PageSizeMask); - physicalAddress = (long)buffer.aligned_pointer + offset; + physicalAddress = (long)buffer.Pointer + offset; } /// @@ -330,7 +330,7 @@ public unsafe void Seal(int entryLength, DeltaLogEntryType type = DeltaLogEntryT if (entryLength > 0) { int offset = (int)(tailAddress & PageSizeMask); - SetBlockHeader(entryLength, type, buffer.aligned_pointer + offset); + SetBlockHeader(entryLength, type, buffer.Pointer + offset); long oldTailAddress = tailAddress; tailAddress += HeaderSize + entryLength; @@ -364,7 +364,7 @@ private unsafe void FlushPage() var asyncResult = new PageAsyncFlushResult { count = 1, freeBuffer1 = buffer }; var alignedBlockSize = Align(tailAddress - pageStartAddress); Interlocked.Increment(ref issuedFlush); - deltaLogDevice.WriteAsync((IntPtr)buffer.aligned_pointer + startOffset, + deltaLogDevice.WriteAsync((IntPtr)buffer.Pointer + startOffset, (ulong)pageStartAddress, (uint)alignedBlockSize, AsyncFlushPageToDeviceCallback, asyncResult); flushedUntilAddress = tailAddress; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs index ea0400d606a..4c3e2962daf 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs @@ -118,22 +118,22 @@ void FlushRunner() } else { - result.mem = new SectorAlignedMemory((int)chunkSize, (int)device.SectorSize); + result.mem = SectorAlignedMemory.Allocate((int)chunkSize, device.SectorSize); bool prot = false; if (!epoch.ThisInstanceProtected()) { prot = true; epoch.Resume(); } - Buffer.MemoryCopy((void*)chunkStartBucket, result.mem.aligned_pointer, chunkSize, chunkSize); + Buffer.MemoryCopy((void*)chunkStartBucket, result.mem.Pointer, chunkSize, chunkSize); for (int j = 0; j < chunkSize; j += sizeof(HashBucket)) { - skipReadCache((HashBucket*)(result.mem.aligned_pointer + j)); + skipReadCache((HashBucket*)(result.mem.Pointer + j)); } if (prot) epoch.Suspend(); - device.WriteAsync((IntPtr)result.mem.aligned_pointer, numBytesWritten, chunkSize, AsyncPageFlushCallback, result); + device.WriteAsync((IntPtr)result.mem.Pointer, numBytesWritten, chunkSize, AsyncPageFlushCallback, result); } if (throttleCheckpointFlushDelayMs >= 0) { diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index 10798bcf527..603d067627a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -1283,7 +1283,7 @@ internal unsafe void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numB if (result.freeBuffer1 != null) { - _wrapper.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + _wrapper.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.RequiredBytes, result.page); result.freeBuffer1.Return(); } int pageIndex = GetPageIndexForPage(result.page); diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 9cacbed79a9..4c3506d54a4 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -671,16 +671,16 @@ public unsafe bool TryEnqueue(T entry, out long logicalAddress) where T : ILo logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) - if (logicalAddress == 0) - { - epoch.Suspend(); - if (cannedException != null) throw cannedException; - return false; - } + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + entry.SerializeTo(new Span(headerSize + physicalAddress, length)); + SetHeader(length, physicalAddress); - var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); - entry.SerializeTo(new Span((void*)(headerSize + physicalAddress), length)); - SetHeader(length, (byte*)physicalAddress); safeTailRefreshEntryEnqueued?.Signal(); epoch.Suspend(); if (AutoCommit) Commit(); @@ -753,17 +753,16 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) - if (logicalAddress == 0) - { - epoch.Suspend(); - if (cannedException != null) throw cannedException; - return false; - } + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } - var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); - fixed (byte* bp = entry) - Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); - SetHeader(length, (byte*)physicalAddress); + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + fixed (byte* entryPtr = entry) + Buffer.MemoryCopy(entryPtr, headerSize + physicalAddress, length, length); + SetHeader(length, physicalAddress); safeTailRefreshEntryEnqueued?.Signal(); epoch.Suspend(); if (AutoCommit) Commit(); @@ -834,9 +833,9 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) return false; } - var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); - fixed (byte* bp = &entry.GetPinnableReference()) - Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + fixed (byte* bp = entry) + Buffer.MemoryCopy(bp, headerSize + physicalAddress, length, length); SetHeader(length, (byte*)physicalAddress); safeTailRefreshEntryEnqueued?.Signal(); epoch.Suspend(); @@ -2282,12 +2281,12 @@ private unsafe bool TryEnqueueCommitRecord(ref TsavoriteLogRecoveryInfo info) info.BeginAddress = BeginAddress; info.UntilAddress = logicalAddress + allocatedLength; - var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); var entryBody = info.ToByteArray(); fixed (byte* bp = entryBody) - Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), entryBody.Length, entryBody.Length); - SetCommitRecordHeader(entryBody.Length, (byte*)physicalAddress); + Buffer.MemoryCopy(bp, headerSize + physicalAddress, entryBody.Length, entryBody.Length); + SetCommitRecordHeader(entryBody.Length, physicalAddress); safeTailRefreshEntryEnqueued?.Signal(); epoch.Suspend(); // Return the commit tail @@ -2794,7 +2793,7 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje else { int requiredBytes = headerSize + length; - if (ctx.record.available_bytes >= requiredBytes) + if (ctx.record.AvailableBytes >= requiredBytes) { ctx.completedRead.Release(); } @@ -2820,7 +2819,7 @@ private void AsyncGetHeaderOnlyFromDiskCallback(uint errorCode, uint numBytes, o } else { - if (ctx.record.available_bytes < headerSize) + if (ctx.record.AvailableBytes < headerSize) { logger?.LogDebug("No record header present at address: {address}", ctx.logicalAddress); ctx.record.Return(); diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogIterator.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogIterator.cs index a04f1d28749..1dc55154c12 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogIterator.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogIterator.cs @@ -665,7 +665,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object if (result.freeBuffer1 != null) { if (errorCode == 0) - allocator._wrapper.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + allocator._wrapper.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.RequiredBytes, result.page); result.freeBuffer1.Return(); result.freeBuffer1 = null; } diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/BufferPool.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/BufferPool.cs deleted file mode 100644 index 8461949ea3b..00000000000 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/BufferPool.cs +++ /dev/null @@ -1,332 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -#if DEBUG -#define CHECK_FREE // disabled by default in Release due to overhead -#endif -// #define CHECK_FOR_LEAKS // disabled by default due to overhead - -using System; -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Numerics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; - -namespace Tsavorite.core -{ - /// - /// Sector aligned memory allocator - /// - public sealed unsafe class SectorAlignedMemory - { - // Byte #31 is used to denote free (1) or in-use (0) page - const int kFreeBitMask = 1 << 31; - - /// - /// Actual buffer - /// - public byte[] buffer; - - /// - /// Handle - /// - internal GCHandle handle; - - /// - /// Offset - /// - public int offset; - - /// - /// Aligned pointer - /// - public byte* aligned_pointer; - - /// - /// Valid offset - /// - public int valid_offset; - - /// - /// Required bytes - /// - public int required_bytes; - - /// - /// Available bytes - /// - public int available_bytes; - - private int level; - internal int Level => level -#if CHECK_FREE - & ~kFreeBitMask -#endif - ; - - internal SectorAlignedBufferPool pool; - -#if CHECK_FREE - internal bool Free - { - get => (level & kFreeBitMask) != 0; - set - { - if (value) - { - if (Free) - throw new TsavoriteException("Attempting to return an already-free block"); - this.level |= kFreeBitMask; - } - else - { - if (!Free) - throw new TsavoriteException("Attempting to allocate an already-allocated block"); - this.level &= ~kFreeBitMask; - } - } - } -#endif // CHECK_FREE - - /// - /// Default constructor - /// - public SectorAlignedMemory(int level = default) - { - this.level = level; - // Assume ctor is called for allocation and leave Free unset - } - - /// - /// Create new instance of SectorAlignedMemory - /// - /// - /// - public SectorAlignedMemory(int numRecords, int sectorSize) - { - int recordSize = 1; - int requiredSize = sectorSize + (((numRecords) * recordSize + (sectorSize - 1)) & ~(sectorSize - 1)); - - buffer = GC.AllocateArray(requiredSize, true); - long bufferAddr = (long)Unsafe.AsPointer(ref buffer[0]); - aligned_pointer = (byte*)((bufferAddr + (sectorSize - 1)) & ~((long)sectorSize - 1)); - offset = (int)((long)aligned_pointer - bufferAddr); - // Assume ctor is called for allocation and leave Free unset - } - - /// - /// Dispose - /// - public void Dispose() - { - buffer = null; -#if CHECK_FREE - this.Free = true; -#endif - } - - /// - /// Return - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Return() - { - pool?.Return(this); - } - - /// - /// Get the total aligned memory capacity of the buffer - /// - public int AlignedTotalCapacity => buffer.Length - offset; - - /// - /// Get valid pointer - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public byte* GetValidPointer() - { - return aligned_pointer + valid_offset; - } - - /// - /// ToString - /// - /// - public override string ToString() - { - return string.Format($"{(long)aligned_pointer} {offset} {valid_offset} {required_bytes} {available_bytes}" -#if CHECK_FREE - + $" {this.Free}" -#endif - ); - } - } - - /// - /// SectorAlignedBufferPool is a pool of memory. - /// Internally, it is organized as an array of concurrent queues where each concurrent - /// queue represents a memory of size in particular range. queue[i] contains memory - /// segments each of size (2^i * sectorSize). - /// - public sealed class SectorAlignedBufferPool - { - /// - /// Disable buffer pool. - /// This static option should be enabled on program entry, and not modified once Tsavorite is instantiated. - /// - public static bool Disabled; - - /// - /// Unpin objects when they are returned to the pool, so that we do not hold pinned objects long term. - /// If set, we will unpin when objects are returned and re-pin when objects are returned from the pool. - /// This static option should be enabled on program entry, and not modified once Tsavorite is instantiated. - /// - public static bool UnpinOnReturn; - - private const int levels = 32; - private readonly int recordSize; - private readonly int sectorSize; - private readonly ConcurrentQueue[] queue; -#if CHECK_FOR_LEAKS - static int totalGets, totalReturns; -#endif - - /// - /// Constructor - /// - /// Record size. May be 1 if allocations of different lengths will be made - /// Sector size, e.g. from log device - public SectorAlignedBufferPool(int recordSize, int sectorSize) - { - queue = new ConcurrentQueue[levels]; - this.recordSize = recordSize; - this.sectorSize = sectorSize; - } - - /// - /// Return - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Return(SectorAlignedMemory page) - { -#if CHECK_FOR_LEAKS - Interlocked.Increment(ref totalReturns); -#endif - -#if CHECK_FREE - page.Free = true; -#endif // CHECK_FREE - - Debug.Assert(queue[page.Level] != null); - page.available_bytes = 0; - page.required_bytes = 0; - page.valid_offset = 0; - Array.Clear(page.buffer, 0, page.buffer.Length); - if (!Disabled) - { - if (UnpinOnReturn) - { - page.handle.Free(); - page.handle = default; - } - queue[page.Level].Enqueue(page); - } - else - { - if (UnpinOnReturn) - page.handle.Free(); - page.buffer = null; - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int Position(int v) - { - if (v == 1) return 0; - return BitOperations.Log2((uint)v - 1) + 1; - } - - /// - /// Get buffer - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public unsafe SectorAlignedMemory Get(int numRecords) - { -#if CHECK_FOR_LEAKS - Interlocked.Increment(ref totalGets); -#endif - - int requiredSize = sectorSize + (((numRecords) * recordSize + (sectorSize - 1)) & ~(sectorSize - 1)); - int index = Position(requiredSize / sectorSize); - if (queue[index] == null) - { - var localPool = new ConcurrentQueue(); - Interlocked.CompareExchange(ref queue[index], localPool, null); - } - - if (!Disabled && queue[index].TryDequeue(out SectorAlignedMemory page)) - { -#if CHECK_FREE - page.Free = false; -#endif // CHECK_FREE - if (UnpinOnReturn) - { - page.handle = GCHandle.Alloc(page.buffer, GCHandleType.Pinned); - page.aligned_pointer = (byte*)(((long)page.handle.AddrOfPinnedObject() + (sectorSize - 1)) & ~((long)sectorSize - 1)); - page.offset = (int)((long)page.aligned_pointer - (long)page.handle.AddrOfPinnedObject()); - } - return page; - } - - page = new SectorAlignedMemory(level: index) - { - buffer = GC.AllocateArray(sectorSize * (1 << index), !UnpinOnReturn) - }; - if (UnpinOnReturn) - page.handle = GCHandle.Alloc(page.buffer, GCHandleType.Pinned); - long pageAddr = (long)Unsafe.AsPointer(ref page.buffer[0]); - page.aligned_pointer = (byte*)((pageAddr + (sectorSize - 1)) & ~((long)sectorSize - 1)); - page.offset = (int)((long)page.aligned_pointer - pageAddr); - page.pool = this; - return page; - } - - /// - /// Free buffer - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Free() - { -#if CHECK_FOR_LEAKS - Debug.Assert(totalGets == totalReturns); -#endif - for (int i = 0; i < levels; i++) - { - if (queue[i] == null) continue; - while (queue[i].TryDequeue(out SectorAlignedMemory result)) - result.buffer = null; - } - } - - /// - /// Print pool contents - /// - public void Print() - { - for (int i = 0; i < levels; i++) - { - if (queue[i] == null) continue; - foreach (var item in queue[i]) - { - Console.WriteLine(" " + item.ToString()); - } - } - } - } -} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/OverflowPool.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/OverflowPool.cs index 4d4764e30bc..f257ea20ddd 100644 --- a/libs/storage/Tsavorite/cs/src/core/Utilities/OverflowPool.cs +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/OverflowPool.cs @@ -32,7 +32,7 @@ public OverflowPool(int size, Action disposer = null) { this.size = size; itemQueue = new ConcurrentQueue(); - this.disposer = disposer ?? (e => { }); + this.disposer = disposer; } /// @@ -58,7 +58,7 @@ public bool TryAdd(T item) } else { - disposer(item); + disposer?.Invoke(item); return false; } } @@ -70,7 +70,7 @@ public void Dispose() { disposed = true; while (itemQueue.TryDequeue(out var item)) - disposer(item); + disposer?.Invoke(item); } } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/SafeAlignedNativeMemoryHandle.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/SafeAlignedNativeMemoryHandle.cs new file mode 100644 index 00000000000..1e3070fb7a8 --- /dev/null +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/SafeAlignedNativeMemoryHandle.cs @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Runtime.InteropServices; +using Microsoft.Win32.SafeHandles; + +namespace Tsavorite.core.Utilities +{ + /// + /// A SafeHandle wrapper around unmanaged _aligned_ memory. + /// + internal sealed unsafe class SafeNativeMemoryHandle : SafeHandleZeroOrMinusOneIsInvalid + { + /// + /// Gets the length of the allocated memory. + /// + public int Length { get; } + + /// + /// Returns a raw pointer to the unmanaged memory. + /// + public unsafe byte* Pointer => (byte*)handle.ToPointer(); + + /// + /// Initializes a new instance of . + /// + /// The pointer to the unmanaged memory. + /// The length of the memory block. + public SafeNativeMemoryHandle(IntPtr address, int length) + : base(ownsHandle: true) + { + SetHandle(address); + Length = length; + } + + /// + /// Releases the unmanaged memory when the handle is disposed or finalized. + /// + /// True if the handle was released successfully. + protected override bool ReleaseHandle() + { + if (!IsInvalid) + { + NativeMemory.AlignedFree((void*)handle); + GC.RemoveMemoryPressure(Length); + } + return true; + } + + /// + /// Allocates aligned unmanaged memory. + /// + /// The total number of bytes to allocate. + /// The alignment in bytes. This must be power of 2. + /// A that wraps the allocated memory. + public static SafeNativeMemoryHandle Allocate(int byteCount, uint alignment) + { + var totalLength = (uint)byteCount + alignment; // TODO: Over allocation for temporarily, fix. + var ptr = NativeMemory.AlignedAlloc(totalLength, alignment); + NativeMemory.Clear(ptr, totalLength); + GC.AddMemoryPressure(byteCount); + return new SafeNativeMemoryHandle((IntPtr)ptr, byteCount); + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Utilities/SectorAlignedMemoryPool.cs b/libs/storage/Tsavorite/cs/src/core/Utilities/SectorAlignedMemoryPool.cs new file mode 100644 index 00000000000..55eb6079735 --- /dev/null +++ b/libs/storage/Tsavorite/cs/src/core/Utilities/SectorAlignedMemoryPool.cs @@ -0,0 +1,311 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#if DEBUG +#define CHECK_FREE // disabled by default in Release due to overhead +#endif +// #define CHECK_FOR_LEAKS // disabled by default due to overhead + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Numerics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using Tsavorite.core.Utilities; + +namespace Tsavorite.core +{ + /// + /// Represents a sector-aligned unmanaged memory block, either owned by the or allocated by the caller. + /// + public sealed unsafe class SectorAlignedMemory : IDisposable + { + // Byte #31 is used to denote free (1) or in-use (0) page + private const int FreeBitMask = 1 << 31; + + private readonly SafeNativeMemoryHandle handle; + + /// + /// Pointer to the memory. + /// + public byte* Pointer => handle.Pointer; + + /// + /// Length of the memory. + /// + public int Length { get; set; } + + /// + /// Valid offset + /// + public int ValidOffset { get; set; } + + /// + /// Required bytes + /// + public int RequiredBytes { get; set; } + + /// + /// Available bytes + /// + public int AvailableBytes { get; set; } + + private int levelAndFlags; + + /// + /// The level of this memory in the buffer pool. + /// + /// + /// If the memory is managed by a pool, this represents the level + /// corresponds to a segment with size of sectorSize * 2^level + /// + /// + /// If the memory is not managed by a pool, this returns zero. + /// + /// + /// + private int Level => levelAndFlags & ~FreeBitMask; + + /// + /// If the memory is managed by a pool, the memory pool this buffer belongs to. Otherwise, . + /// + internal SectorAlignedMemoryPool pool; + + /// + /// Indicates whether a block is free or in-use by the pool. + /// + internal bool Free + { + get => (levelAndFlags & FreeBitMask) != 0; + set + { + if (value) + { +#if CHECK_FREE + if (Free) + throw new TsavoriteException("Attempting to return an already-free block"); +#endif // CHECK_FREE + levelAndFlags |= FreeBitMask; + } + else + { +#if CHECK_FREE + if (!Free) + throw new TsavoriteException("Attempting to allocate an already-allocated block"); +#endif // CHECK_FREE + levelAndFlags &= ~FreeBitMask; + } + } + } + + /// + /// Create new instance of SectorAlignedMemory + /// + private SectorAlignedMemory(SafeNativeMemoryHandle handle, int length) + { + this.handle = handle; + Length = length; + // Assume ctor is called for allocation and leave Free unset + } + + /// + private SectorAlignedMemory(SafeNativeMemoryHandle handle, int length, SectorAlignedMemoryPool pool, int level) + : this(handle, length) + { + this.pool = pool; + this.levelAndFlags = level; + // Assume ctor is called for allocation and leave Free unset + } + + /// + /// Frees the underlying unmanaged memory + /// + public void Dispose() => handle?.Dispose(); + + /// + /// Clear the underlying buffer. + /// + public void Clear() => NativeMemory.Clear(Pointer, (nuint)Length); + + /// + /// Return to the pool + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Return() => pool?.Return(Level, this); + + /// + /// Get valid pointer + /// + public byte* GetValidPointer() => Pointer + ValidOffset; + + /// + /// Gets a span over the allocated memory. + /// + public Span AsSpan() => new(Pointer, Length); + + /// + /// Gets a valid span which is calculated using + /// + public Span AsValidSpan() => new(Pointer + ValidOffset, Length - ValidOffset); + + /// + public override string ToString() => $"{(nuint)Pointer} {ValidOffset} {RequiredBytes} {AvailableBytes} {Free}"; + + /// + /// Allocates unmanaged aligned memory. + /// + /// The total number of bytes to allocate. + /// The alignment. + /// Returns a instance wrapping the memory. + public static SectorAlignedMemory Allocate(int byteCount, uint alignment) + { + var handle = SafeNativeMemoryHandle.Allocate(byteCount, alignment); + return new SectorAlignedMemory(handle, byteCount); + } + + /// + /// Allocates unmanaged aligned memory segment for the pool. + /// + /// The level of the memory within the . + /// The alignment size. + /// The memory pool from which owns this memory. + /// Returns a pool owned instance wrapping the memory. + internal static SectorAlignedMemory Allocate(int level, uint alignment, SectorAlignedMemoryPool pool) + { + var byteCount = checked((int)alignment * (1 << level)); + var handle = SafeNativeMemoryHandle.Allocate(byteCount, alignment); + return new SectorAlignedMemory(handle, byteCount, pool, level); + } + } + + /// + /// Represents a pool of memory. + /// + /// Internally, it is organized as an array of concurrent queues where each concurrent + /// queue represents a memory of size in particular range. queue[level] contains memory + /// segments each of size (sectorSize * 2^level). + /// + public sealed unsafe class SectorAlignedMemoryPool : IDisposable + { + private const int Levels = 32; + + private readonly int recordSize; + private readonly int sectorSize; + private readonly ConcurrentQueue[] queue; +#if CHECK_FOR_LEAKS + static int totalGets, totalReturns; +#endif + + /// + /// Initializes a new sector-aligned memory pool. + /// + /// Record size. May be 1 if allocations of different lengths will be made + /// Sector size for memory alignment, e.g. from log device + public SectorAlignedMemoryPool(int recordSize, int sectorSize) + { + queue = new ConcurrentQueue[Levels]; + this.recordSize = recordSize; + this.sectorSize = sectorSize; + } + + /// + /// Returns a memory to the pool. + /// + /// The level of the page in the pool. + /// The memory instance. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Return(int level, SectorAlignedMemory memory) + { +#if CHECK_FOR_LEAKS + Interlocked.Increment(ref totalReturns); +#endif + + memory.AvailableBytes = 0; + memory.RequiredBytes = 0; + memory.ValidOffset = 0; + + memory.Clear(); + memory.Free = true; + + Debug.Assert(queue[level] != null); + Debug.Assert(memory.pool == this); + queue[level].Enqueue(memory); + } + + /// + /// Calculates the level of memory based on required sectors. + /// + /// Number of sectors required. + /// The corresponding level for the memory. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int Level(int sectors) + { + return sectors == 1 ? 0 : BitOperations.Log2((uint)sectors - 1) + 1; + } + + /// + /// Allocates or retrieves a sector-aligned memory from the pool. + /// + /// + /// is equivalent to requesting one record, returning smallest possible segment from the pool. + /// + /// The number of records required. + /// + /// A segment which length is the requested memory rounded up to nearest multiple of sectorSize × 2^level + /// for some level between 0 and 32. + /// + /// + /// + /// using var pool = new SectorAlignedMemory(recordSize: 1, sectorSize: 512); + /// var buffer = pool.Get(1000); + /// Debug.Assert(buffer.Length == 1024); + /// buffer.Return(); + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public unsafe SectorAlignedMemory Get(int numRecords) + { +#if CHECK_FOR_LEAKS + Interlocked.Increment(ref totalGets); +#endif + numRecords = Math.Max(numRecords, 1); + + // How many sectors do we need? + var sectorsRequired = (numRecords * recordSize + (sectorSize - 1)) / sectorSize; + var level = Level(sectorsRequired); + if (queue[level] == null) + { + var localPool = new ConcurrentQueue(); + Interlocked.CompareExchange(ref queue[level], localPool, null); + } + + if (queue[level].TryDequeue(out var page)) + { + page.Free = false; + return page; + } + + return SectorAlignedMemory.Allocate(level, (uint)sectorSize, pool: this); + } + + /// + /// Free all the unmanaged memory owned by the pool. + /// + public void Dispose() + { +#if CHECK_FOR_LEAKS + Debug.Assert(totalGets == totalReturns); +#endif + for (var i = 0; i < queue.Length; i++) + { + if (queue[i] == null) continue; + while (queue[i].TryDequeue(out var result)) + { + result.Dispose(); + } + } + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/VarLen/SpanByteHeapContainer.cs b/libs/storage/Tsavorite/cs/src/core/VarLen/SpanByteHeapContainer.cs index 9220411a89c..9b95fc68f0d 100644 --- a/libs/storage/Tsavorite/cs/src/core/VarLen/SpanByteHeapContainer.cs +++ b/libs/storage/Tsavorite/cs/src/core/VarLen/SpanByteHeapContainer.cs @@ -12,7 +12,7 @@ internal sealed class SpanByteHeapContainer : IHeapContainer { readonly SectorAlignedMemory mem; - public unsafe SpanByteHeapContainer(ref SpanByte obj, SectorAlignedBufferPool pool) + public unsafe SpanByteHeapContainer(ref SpanByte obj, SectorAlignedMemoryPool pool) { mem = pool.Get(obj.TotalSize); obj.CopyTo(mem.GetValidPointer()); diff --git a/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs b/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs index 304691e2e8f..2526ae466c4 100644 --- a/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs +++ b/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs @@ -49,7 +49,7 @@ public void DeltaLogTest1([Values] TestUtils.DeviceType deviceType) Random r = new(20); int i; - SectorAlignedBufferPool bufferPool = new(1, (int)device.SectorSize); + SectorAlignedMemoryPool bufferPool = new(recordSize: 1, (int)device.SectorSize); deltaLog.InitializeForWrites(bufferPool); for (i = 0; i < TotalCount; i++) { @@ -82,7 +82,7 @@ public void DeltaLogTest1([Values] TestUtils.DeviceType deviceType) } } ClassicAssert.AreEqual(TotalCount, i, $"i={i} and TotalCount={TotalCount}"); - bufferPool.Free(); + bufferPool.Dispose(); } } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/test/DeviceTests.cs b/libs/storage/Tsavorite/cs/test/DeviceTests.cs index 302fbd99a56..1d7cb7dd19a 100644 --- a/libs/storage/Tsavorite/cs/test/DeviceTests.cs +++ b/libs/storage/Tsavorite/cs/test/DeviceTests.cs @@ -16,7 +16,7 @@ namespace Tsavorite.test public class DeviceTests { const int entryLength = 1024; - SectorAlignedBufferPool bufferPool; + SectorAlignedMemoryPool bufferPool; readonly byte[] entry = new byte[entryLength]; SemaphoreSlim semaphore; @@ -30,7 +30,7 @@ public void Setup() for (int i = 0; i < entry.Length; i++) entry[i] = (byte)i; - bufferPool = new SectorAlignedBufferPool(1, 512); + bufferPool = new SectorAlignedMemoryPool(recordSize: 1, sectorSize: 512); semaphore = new SemaphoreSlim(0); } @@ -38,7 +38,7 @@ public void Setup() public void TearDown() { semaphore.Dispose(); - bufferPool.Free(); + bufferPool.Dispose(); // Clean up log files TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); @@ -99,10 +99,10 @@ unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size) var pbuffer = bufferPool.Get((int)numBytesToWrite); fixed (byte* bufferRaw = buffer) { - Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size); + Buffer.MemoryCopy(bufferRaw, pbuffer.Pointer, size, size); } - device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, IOCallback, null); + device.WriteAsync((IntPtr)pbuffer.Pointer, address, (uint)numBytesToWrite, IOCallback, null); semaphore.Wait(); pbuffer.Return(); @@ -114,12 +114,12 @@ unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size) numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1)); var pbuffer = bufferPool.Get((int)numBytesToRead); - device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer, + device.ReadAsync(address, (IntPtr)pbuffer.Pointer, (uint)numBytesToRead, IOCallback, null); semaphore.Wait(); buffer = new byte[numBytesToRead]; fixed (byte* bufferRaw = buffer) - Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead); + Buffer.MemoryCopy(pbuffer.Pointer, bufferRaw, numBytesToRead, numBytesToRead); pbuffer.Return(); } diff --git a/libs/storage/Tsavorite/cs/test/LogTests.cs b/libs/storage/Tsavorite/cs/test/LogTests.cs index 40c6eea9418..a76d1d499ca 100644 --- a/libs/storage/Tsavorite/cs/test/LogTests.cs +++ b/libs/storage/Tsavorite/cs/test/LogTests.cs @@ -89,14 +89,14 @@ protected void BaseSetup(bool deleteOnClose = true) protected void BaseTearDown() { + device?.Dispose(); + device = null; log?.Dispose(); log = null; if (!deleteOnClose) manager.RemoveAllCommits(); manager?.Dispose(); manager = null; - device?.Dispose(); - device = null; TestUtils.DeleteDirectory(TestUtils.MethodTestDir); } @@ -1035,8 +1035,6 @@ public async ValueTask RefreshUncommittedAsyncTest([Values] IteratorType iterato await AssertGetNext(asyncByteVectorIter, asyncMemoryOwnerIter, iter, data1, verifyAtEnd: true); } - - log.Dispose(); } } diff --git a/libs/storage/Tsavorite/cs/test/MiscTests.cs b/libs/storage/Tsavorite/cs/test/MiscTests.cs index e7db813fbeb..f9c1e2781e0 100644 --- a/libs/storage/Tsavorite/cs/test/MiscTests.cs +++ b/libs/storage/Tsavorite/cs/test/MiscTests.cs @@ -198,6 +198,7 @@ public void ForceRCUAndRecover([Values(UpdateOp.Upsert, UpdateOp.Delete)] Update _ = store.Recover(token); session = store.NewSession(copyOnWrite); + bContext = session.BasicContext; using (var iterator = store.Log.Scan(store.Log.BeginAddress, store.Log.TailAddress)) { diff --git a/playground/Bitmap/BitCount.cs b/playground/Bitmap/BitCount.cs index c7cb4be15c5..a5dbad1ed5c 100644 --- a/playground/Bitmap/BitCount.cs +++ b/playground/Bitmap/BitCount.cs @@ -16,7 +16,7 @@ public static unsafe class BitCount private static byte* alloc_aligned(int numRecords, int sectorSize) { - alignedMemory = new SectorAlignedMemory(numRecords, sectorSize); + alignedMemory = SectorAlignedMemory.Allocate(numRecords, (uint)sectorSize); return alignedMemory.GetValidPointer(); } diff --git a/playground/Bitmap/BitOp.cs b/playground/Bitmap/BitOp.cs index 0d8878f0849..f0dd76d134e 100644 --- a/playground/Bitmap/BitOp.cs +++ b/playground/Bitmap/BitOp.cs @@ -24,7 +24,7 @@ public static unsafe class BitOp private static byte* alloc_aligned(int numRecords, int sectorSize) { - alignedMemoryPool.AddFirst(new SectorAlignedMemory(numRecords, sectorSize)); + alignedMemoryPool.AddFirst(SectorAlignedMemory.Allocate(numRecords, (uint)sectorSize)); return alignedMemoryPool.First.Value.GetValidPointer(); } @@ -55,7 +55,7 @@ private static void InitializeInputAlignedMemoryPool(int batchSize, int bitmapLe for (int i = 0; i < batchSize; i++) { r.NextBytes(buffer); - sectorAlignedMemoryBuffers.Add(new SectorAlignedMemory(bitmapLen, alignment)); + sectorAlignedMemoryBuffers.Add(SectorAlignedMemory.Allocate(bitmapLen, (uint)alignment)); fixed (byte* b = buffer) {