Skip to content

Commit 3323ad0

Browse files
badrishcCopilot
andcommitted
LightEpoch: remove instance limit with pointer-based per-thread entries
Replace the fixed-size InstanceIndexBuffer [ThreadStatic] struct (previously limited to 16 concurrent instances) with a dynamically-growing GC-pinned int[] accessed via a [ThreadStatic] int* pointer. This removes the hard cap on concurrent LightEpoch instances entirely. Design: - Per-thread entries array allocated as pinned int[] on first Resume() per thread, starting at capacity 16 and doubling as needed - Hot-path access is uniform pointer arithmetic: *(entriesPtr + instanceId), no branches - Instance IDs allocated via atomic counter with ConcurrentQueue recycling of disposed IDs to keep per-thread arrays compact - Double-dispose guarded via Interlocked.Exchange - ThisInstanceProtected() includes null + bounds checks for pre-Resume() calls Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d1a9289 commit 3323ad0

2 files changed

Lines changed: 187 additions & 165 deletions

File tree

libs/client/LightEpoch.cs

Lines changed: 95 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Diagnostics;
67
using System.Runtime.CompilerServices;
78
using System.Runtime.InteropServices;
@@ -15,37 +16,9 @@ namespace Garnet.client
1516
public sealed unsafe class LightEpoch
1617
{
1718
/// <summary>
18-
/// Buffer to track information for LightEpoch instances. This is used:
19-
/// (1) in AssignInstance, to assign a unique instanceId to each LightEpoch instance, and
20-
/// (2) in Metadata, to track per-thread epoch table entries for each LightEpoch instance.
21-
/// </summary>
22-
[StructLayout(LayoutKind.Explicit, Size = MaxInstances * sizeof(int))]
23-
private struct InstanceIndexBuffer
24-
{
25-
/// <summary>
26-
/// Maximum number of concurrent instances of LightEpoch supported.
27-
/// </summary>
28-
internal const int MaxInstances = 1024;
29-
30-
/// <summary>
31-
/// Anchor field for the buffer.
32-
/// </summary>
33-
[FieldOffset(0)]
34-
int field0;
35-
36-
/// <summary>
37-
/// Reference to the entry for the given instance ID.
38-
/// </summary>
39-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
40-
internal ref int GetRef(int instanceId)
41-
{
42-
Debug.Assert(instanceId >= 0 && instanceId < MaxInstances);
43-
return ref Unsafe.AsRef<int>((int*)Unsafe.AsPointer(ref field0) + instanceId);
44-
}
45-
}
46-
47-
/// <summary>
48-
/// Store for thread-static metadata.
19+
/// Store for thread-static metadata. Each thread lazily allocates a GC-pinned int[]
20+
/// that maps instanceId to epoch table entry index. The pinned pointer enables
21+
/// uniform access via pointer arithmetic with no branches on the hot path.
4922
/// </summary>
5023
private class Metadata
5124
{
@@ -62,24 +35,31 @@ private class Metadata
6235
internal static ushort startOffset1;
6336

6437
/// <summary>
65-
/// Alternate start offset to reserve entry in the epoch table (to reduce probing if <see cref="startOffset1"/> slot is already filled)
38+
/// Alternate start offset to reserve entry in the epoch table
6639
/// </summary>
6740
[ThreadStatic]
6841
internal static ushort startOffset2;
6942

7043
/// <summary>
71-
/// This is the thread-static index for fast access to the tableAligned index
72-
/// that is obtained when each LightEpoch instance calls ReserveEntry.
73-
/// The instanceId of the LightEpoch instance (assigned to the instance
74-
/// at constructor time using InstanceTracker) is the lookup offset into
75-
/// Entries.
76-
///
77-
/// Note that Entries effectively gives us ThreadLocal{T} semantics of
78-
/// (instance, thread)-specific metadata, without the overhead of
79-
/// ThreadLocal{T}.
44+
/// Pointer to the per-thread entries array. Each LightEpoch instance's instanceId
45+
/// is the lookup offset: <c>*(entriesPtr + instanceId)</c> gives the epoch table
46+
/// entry index for this thread. Targets a GC-pinned int[] on the Pinned Object Heap.
47+
/// Null until the thread's first <see cref="LightEpoch.Acquire"/> call.
48+
/// </summary>
49+
[ThreadStatic]
50+
internal static int* entriesPtr;
51+
52+
/// <summary>
53+
/// Keeps the pinned entries array rooted so GC does not collect it.
54+
/// </summary>
55+
[ThreadStatic]
56+
internal static int[] entriesArray;
57+
58+
/// <summary>
59+
/// Current capacity of <see cref="entriesArray"/>. Zero until initialized.
8060
/// </summary>
8161
[ThreadStatic]
82-
internal static InstanceIndexBuffer Entries;
62+
internal static int entriesCapacity;
8363
}
8464

8565
/// <summary>
@@ -102,6 +82,11 @@ private class Metadata
10282
/// </summary>
10383
const int kDrainListSize = 16;
10484

85+
/// <summary>
86+
/// Initial per-thread entries array capacity. Grows by doubling as needed.
87+
/// </summary>
88+
const int kInitialEntriesCapacity = 16;
89+
10590
/// <summary>
10691
/// Thread protection status entries.
10792
/// </summary>
@@ -152,12 +137,25 @@ private class Metadata
152137
readonly int instanceId;
153138

154139
/// <summary>
155-
/// This is the LightEpoch-level static buffer (array) of available instance slots.
156-
/// On LightEpoch instance creation, it is used by SelectInstance() to find an
157-
/// available slot in this array; this becomes the LightEpoch instance's instanceId,
158-
/// which is the lookup index into the thread-static Metadata.Entries.
140+
/// Set to 1 on first Dispose call to prevent double-dispose.
141+
/// </summary>
142+
int disposed;
143+
144+
/// <summary>
145+
/// Next instance ID to allocate. Monotonically increasing.
146+
/// </summary>
147+
static int nextInstanceId;
148+
149+
/// <summary>
150+
/// Pool of recycled instance IDs for reuse. Keeps per-thread arrays compact
151+
/// when instances are frequently created and disposed.
152+
/// </summary>
153+
static readonly ConcurrentQueue<int> freeInstanceIds = new();
154+
155+
/// <summary>
156+
/// Number of currently active LightEpoch instances.
159157
/// </summary>
160-
static InstanceIndexBuffer InstanceTracker;
158+
static int activeInstanceCount;
161159

162160
/// <summary>
163161
/// Instantiate the epoch table
@@ -186,14 +184,10 @@ public LightEpoch()
186184

187185
int SelectInstance()
188186
{
189-
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
190-
{
191-
ref var entry = ref InstanceTracker.GetRef(i);
192-
// Try to claim this instance ID (indicated as 1 in the entry)
193-
if (kInvalidIndex == Interlocked.CompareExchange(ref entry, 1, kInvalidIndex))
194-
return i;
195-
}
196-
throw new InvalidOperationException($"Exceeded maximum number of active LightEpoch instances {ActiveInstanceCount()} {InstanceIndexBuffer.MaxInstances}");
187+
Interlocked.Increment(ref activeInstanceCount);
188+
if (freeInstanceIds.TryDequeue(out var recycledId))
189+
return recycledId;
190+
return Interlocked.Increment(ref nextInstanceId) - 1;
197191
}
198192

199193
/// <summary>
@@ -202,31 +196,28 @@ int SelectInstance()
202196
/// <returns></returns>
203197
public static int ActiveInstanceCount()
204198
{
205-
int count = 0;
206-
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
207-
{
208-
if (kInvalidIndex != InstanceTracker.GetRef(i))
209-
count++;
210-
}
211-
return count;
199+
return Volatile.Read(ref activeInstanceCount);
212200
}
213201

214202
/// <summary>
215203
/// Reset all instances. Used for testing to reset static LightEpoch state for all instances.
216204
/// </summary>
217205
public static void ResetAllInstances()
218206
{
219-
for (var i = 0; i < InstanceIndexBuffer.MaxInstances; i++)
220-
{
221-
InstanceTracker.GetRef(i) = kInvalidIndex;
222-
}
207+
nextInstanceId = 0;
208+
activeInstanceCount = 0;
209+
while (freeInstanceIds.TryDequeue(out _)) { }
223210
}
224211

225212
/// <summary>
226213
/// Clean up epoch table
227214
/// </summary>
228215
public void Dispose()
229216
{
217+
// Guard against double-dispose
218+
if (Interlocked.Exchange(ref disposed, 1) != 0)
219+
return;
220+
230221
// Cancel any threads currently waiting on the semaphore so they
231222
// unwind and decrement waiterCount.
232223
cts.Cancel();
@@ -241,8 +232,8 @@ public void Dispose()
241232

242233
CurrentEpoch = 1;
243234
SafeToReclaimEpoch = 0;
244-
// Mark this instance ID as available
245-
InstanceTracker.GetRef(instanceId) = kInvalidIndex;
235+
freeInstanceIds.Enqueue(instanceId);
236+
Interlocked.Decrement(ref activeInstanceCount);
246237

247238
cts.Dispose();
248239
waiterSemaphore.Dispose();
@@ -252,15 +243,14 @@ public void Dispose()
252243
/// Check whether current epoch instance is protected on this thread
253244
/// </summary>
254245
/// <returns>Result of the check</returns>
246+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
255247
public bool ThisInstanceProtected()
256248
{
257-
ref var entry = ref Metadata.Entries.GetRef(instanceId);
258-
if (kInvalidIndex != entry)
259-
{
260-
if ((*(tableAligned + entry)).threadId == Metadata.threadId)
261-
return true;
262-
}
263-
return false;
249+
var ptr = Metadata.entriesPtr;
250+
if (ptr == null || (uint)instanceId >= (uint)Metadata.entriesCapacity)
251+
return false;
252+
var entry = *(ptr + instanceId);
253+
return kInvalidIndex != entry && (*(tableAligned + entry)).threadId == Metadata.threadId;
264254
}
265255

266256
/// <summary>
@@ -285,7 +275,8 @@ public bool TrySuspend()
285275
[MethodImpl(MethodImplOptions.AggressiveInlining)]
286276
public void ProtectAndDrain()
287277
{
288-
ref var entry = ref Metadata.Entries.GetRef(instanceId);
278+
Debug.Assert(Metadata.entriesPtr != null, "ProtectAndDrain called before Resume on this thread");
279+
ref var entry = ref *(Metadata.entriesPtr + instanceId);
289280

290281
Debug.Assert(entry > 0, "Trying to refresh unacquired epoch");
291282
Debug.Assert((*(tableAligned + entry)).threadId > 0, "Epoch table entry missing threadId");
@@ -501,12 +492,15 @@ void Drain(long nextEpoch)
501492
[MethodImpl(MethodImplOptions.AggressiveInlining)]
502493
void Acquire()
503494
{
504-
ref var entry = ref Metadata.Entries.GetRef(instanceId);
495+
if (instanceId >= Metadata.entriesCapacity)
496+
EnsureThreadInitialized();
497+
498+
ref var entry = ref *(Metadata.entriesPtr + instanceId);
505499
Debug.Assert(entry == kInvalidIndex,
506500
"Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");
507501

508502
// Reserve an entry in the epoch table for this thread
509-
ReserveEntryForThread(ref entry);
503+
ReserveEntry(ref entry);
510504

511505
Debug.Assert((*(tableAligned + entry)).localCurrentEpoch == 0,
512506
"Trying to acquire protected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");
@@ -529,7 +523,8 @@ void Acquire()
529523
[MethodImpl(MethodImplOptions.AggressiveInlining)]
530524
void Release()
531525
{
532-
ref var entry = ref Metadata.Entries.GetRef(instanceId);
526+
Debug.Assert(Metadata.entriesPtr != null, "Release called before Resume on this thread");
527+
ref var entry = ref *(Metadata.entriesPtr + instanceId);
533528

534529
Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0,
535530
"Trying to release unprotected epoch. Make sure you do not re-enter Tsavorite from callbacks or IDevice implementations. If using tasks, use TaskCreationOptions.RunContinuationsAsynchronously.");
@@ -654,20 +649,34 @@ void ReserveEntryWait(ref int entry)
654649
}
655650

656651
/// <summary>
657-
/// Allocate a new entry in epoch table
652+
/// Initialize per-thread metadata and/or grow the entries array. Called when
653+
/// <c>instanceId >= Metadata.entriesCapacity</c>, which covers both first-time
654+
/// init (capacity is 0) and growth (new instance with higher ID). This method
655+
/// is NoInlining to keep the fast path in Acquire compact.
658656
/// </summary>
659-
/// <returns>Reserved entry</returns>
660-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
661-
void ReserveEntryForThread(ref int entry)
657+
[MethodImpl(MethodImplOptions.NoInlining)]
658+
void EnsureThreadInitialized()
662659
{
663-
if (Metadata.threadId == 0) // run once per thread for performance
660+
if (Metadata.threadId == 0)
664661
{
665662
Metadata.threadId = Environment.CurrentManagedThreadId;
666-
uint code = (uint)Utility.Murmur3(Metadata.threadId);
663+
var code = (uint)Utility.Murmur3(Metadata.threadId);
667664
Metadata.startOffset1 = (ushort)(1 + (code % kTableSize));
668665
Metadata.startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize));
669666
}
670-
ReserveEntry(ref entry);
667+
668+
// Compute required capacity (round up to power of 2)
669+
var requiredCapacity = instanceId + 1;
670+
var newCapacity = Math.Max(kInitialEntriesCapacity, Metadata.entriesCapacity);
671+
while (newCapacity < requiredCapacity)
672+
newCapacity *= 2;
673+
674+
var newArray = GC.AllocateArray<int>(newCapacity, pinned: true);
675+
if (Metadata.entriesArray != null)
676+
Array.Copy(Metadata.entriesArray, newArray, Metadata.entriesArray.Length);
677+
Metadata.entriesArray = newArray;
678+
Metadata.entriesPtr = (int*)Unsafe.AsPointer(ref newArray[0]);
679+
Metadata.entriesCapacity = newCapacity;
671680
}
672681

673682
/// <inheritdoc/>

0 commit comments

Comments
 (0)