Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/TidesDB/ColumnFamily.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,97 @@ public sealed class ColumnFamily
{
internal readonly nint Handle;

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate int NativeCommitHookFn(nint ops, int numOps, ulong commitSeq, nint ctx);

private static readonly NativeCommitHookFn s_hookBridge = CommitHookBridge;
private static readonly nint s_hookBridgePtr = Marshal.GetFunctionPointerForDelegate(s_hookBridge);

private GCHandle _hookGcHandle;

internal ColumnFamily(nint handle)
{
Handle = handle;
}

/// <summary>
/// Sets a commit hook callback that fires synchronously after every transaction
/// commit on this column family. The hook receives the full batch of committed
/// operations atomically. Pass null or call ClearCommitHook to disable.
/// </summary>
/// <param name="handler">The commit hook callback.</param>
public void SetCommitHook(CommitHookHandler handler)
{
ClearCommitHook();

_hookGcHandle = GCHandle.Alloc(handler);
var result = NativeMethods.tidesdb_cf_set_commit_hook(
Handle, s_hookBridgePtr, GCHandle.ToIntPtr(_hookGcHandle));

if (result != 0)
{
_hookGcHandle.Free();
_hookGcHandle = default;
TidesDBException.ThrowIfError(result, "failed to set commit hook");
}
}

/// <summary>
/// Clears the commit hook for this column family.
/// </summary>
public void ClearCommitHook()
{
var result = NativeMethods.tidesdb_cf_set_commit_hook(Handle, nint.Zero, nint.Zero);
TidesDBException.ThrowIfError(result, "failed to clear commit hook");

if (_hookGcHandle.IsAllocated)
{
_hookGcHandle.Free();
_hookGcHandle = default;
}
}

private static int CommitHookBridge(nint ops, int numOps, ulong commitSeq, nint ctx)
{
try
{
var handle = GCHandle.FromIntPtr(ctx);
var callback = (CommitHookHandler)handle.Target!;

var managedOps = new CommitOp[numOps];
var structSize = Marshal.SizeOf<NativeCommitOp>();
for (int i = 0; i < numOps; i++)
{
var nativeOp = Marshal.PtrToStructure<NativeCommitOp>(ops + i * structSize);

byte[] key = new byte[(int)nativeOp.KeySize];
Marshal.Copy(nativeOp.Key, key, 0, key.Length);

byte[]? value = null;
if (nativeOp.IsDelete == 0 && nativeOp.Value != nint.Zero)
{
value = new byte[(int)nativeOp.ValueSize];
Marshal.Copy(nativeOp.Value, value, 0, value.Length);
}

managedOps[i] = new CommitOp
{
Key = key,
Value = value,
Ttl = nativeOp.Ttl,
IsDelete = nativeOp.IsDelete != 0
};
}

callback(managedOps, commitSeq);
return 0;
}
catch
{
return -1;
}
}

/// <summary>
/// Checks if a flush operation is in progress for this column family.
/// </summary>
Expand Down
52 changes: 52 additions & 0 deletions src/TidesDB/CommitHook.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (C) TidesDB
//
// Original Author: Alex Gaetano Padula
//
// Licensed under the Mozilla Public License, v. 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.mozilla.org/en-US/MPL/2.0/
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace TidesDB;

/// <summary>
/// Represents a single operation in a committed transaction batch.
/// Passed to the commit hook callback.
/// </summary>
public sealed class CommitOp
{
/// <summary>
/// The key data.
/// </summary>
public byte[] Key { get; init; } = [];

/// <summary>
/// The value data (null for deletes).
/// </summary>
public byte[]? Value { get; init; }

/// <summary>
/// Time-to-live for the key-value pair (0 = no expiry).
/// </summary>
public long Ttl { get; init; }

/// <summary>
/// True if this is a delete operation, false for put.
/// </summary>
public bool IsDelete { get; init; }
}

/// <summary>
/// Callback invoked synchronously after a transaction commits to a column family.
/// The callback receives the full batch of operations for that CF atomically.
/// </summary>
/// <param name="ops">Array of committed operations.</param>
/// <param name="commitSeq">Monotonic commit sequence number.</param>
public delegate void CommitHookHandler(CommitOp[] ops, ulong commitSeq);
4 changes: 4 additions & 0 deletions src/TidesDB/Native/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ internal static partial class NativeMethods
[LibraryImport(LibraryName, EntryPoint = "tidesdb_register_comparator", StringMarshalling = StringMarshalling.Utf8)]
internal static partial int tidesdb_register_comparator(nint db, string name, nint fn, string? ctxStr, nint ctx);

// Commit hook operations
[LibraryImport(LibraryName, EntryPoint = "tidesdb_cf_set_commit_hook")]
internal static partial int tidesdb_cf_set_commit_hook(nint cf, nint fn, nint ctx);

// Range cost estimation
[LibraryImport(LibraryName, EntryPoint = "tidesdb_range_cost")]
internal static unsafe partial int tidesdb_range_cost(nint cf, byte* keyA, nuint keyASize, byte* keyB, nuint keyBSize, out double cost);
Expand Down
13 changes: 13 additions & 0 deletions src/TidesDB/Native/NativeStructs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ internal unsafe struct NativeColumnFamilyConfig
public int L1FileCountTrigger;
public int L0QueueStallThreshold;
public int UseBtree;
public nint CommitHookFn;
public nint CommitHookCtx;
}

[StructLayout(LayoutKind.Sequential)]
internal struct NativeCommitOp
{
public nint Key;
public nuint KeySize;
public nint Value;
public nuint ValueSize;
public long Ttl;
public int IsDelete;
}

[StructLayout(LayoutKind.Sequential)]
Expand Down
2 changes: 2 additions & 0 deletions src/TidesDB/TidesDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ private static unsafe NativeColumnFamilyConfig CreateNativeColumnFamilyConfig(Co
L1FileCountTrigger = config.L1FileCountTrigger,
L0QueueStallThreshold = config.L0QueueStallThreshold,
UseBtree = config.UseBtree ? 1 : 0,
CommitHookFn = nint.Zero,
CommitHookCtx = nint.Zero,
ComparatorFnCached = nint.Zero,
ComparatorCtxCached = nint.Zero
};
Expand Down
2 changes: 1 addition & 1 deletion src/TidesDB/TidesDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<!-- Package metadata -->
<PackageId>TidesDB</PackageId>
<Version>0.4.2</Version>
<Version>0.4.3</Version>
<Authors>TidesDB</Authors>
<Company>TidesDB</Company>
<Description>Official C# bindings for TidesDB - A high-performance embedded key-value storage engine</Description>
Expand Down
146 changes: 146 additions & 0 deletions tests/TidesDB.Tests/TidesDBTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,152 @@ public void ColumnFamily_IsCompacting_ShouldReturnBool()
Assert.False(isCompacting);
}

[Fact]
public void CommitHook_ShouldFireOnCommit()
{
using var db = OpenDatabase();
db.CreateColumnFamily("test_cf");
var cf = db.GetColumnFamily("test_cf")!;

var receivedOps = new List<CommitOp>();
ulong receivedSeq = 0;

cf.SetCommitHook((ops, seq) =>
{
receivedOps.AddRange(ops);
receivedSeq = seq;
});

using (var txn = db.BeginTransaction())
{
txn.Put(cf, Encoding.UTF8.GetBytes("key1"), Encoding.UTF8.GetBytes("value1"));
txn.Commit();
}

Assert.Single(receivedOps);
Assert.Equal("key1", Encoding.UTF8.GetString(receivedOps[0].Key));
Assert.NotNull(receivedOps[0].Value);
Assert.Equal("value1", Encoding.UTF8.GetString(receivedOps[0].Value!));
Assert.False(receivedOps[0].IsDelete);
Assert.True(receivedSeq > 0);
}

[Fact]
public void CommitHook_ShouldReceiveDeleteOps()
{
using var db = OpenDatabase();
db.CreateColumnFamily("test_cf");
var cf = db.GetColumnFamily("test_cf")!;

using (var txn = db.BeginTransaction())
{
txn.Put(cf, Encoding.UTF8.GetBytes("key1"), Encoding.UTF8.GetBytes("value1"));
txn.Commit();
}

var receivedOps = new List<CommitOp>();

cf.SetCommitHook((ops, seq) =>
{
receivedOps.AddRange(ops);
});

using (var txn = db.BeginTransaction())
{
txn.Delete(cf, Encoding.UTF8.GetBytes("key1"));
txn.Commit();
}

Assert.Single(receivedOps);
Assert.Equal("key1", Encoding.UTF8.GetString(receivedOps[0].Key));
Assert.True(receivedOps[0].IsDelete);
Assert.Null(receivedOps[0].Value);
}

[Fact]
public void CommitHook_ShouldReceiveMultipleOps()
{
using var db = OpenDatabase();
db.CreateColumnFamily("test_cf");
var cf = db.GetColumnFamily("test_cf")!;

var receivedOps = new List<CommitOp>();

cf.SetCommitHook((ops, seq) =>
{
receivedOps.AddRange(ops);
});

using (var txn = db.BeginTransaction())
{
txn.Put(cf, Encoding.UTF8.GetBytes("key1"), Encoding.UTF8.GetBytes("value1"));
txn.Put(cf, Encoding.UTF8.GetBytes("key2"), Encoding.UTF8.GetBytes("value2"));
txn.Put(cf, Encoding.UTF8.GetBytes("key3"), Encoding.UTF8.GetBytes("value3"));
txn.Commit();
}

Assert.Equal(3, receivedOps.Count);
}

[Fact]
public void CommitHook_ClearShouldStopFiring()
{
using var db = OpenDatabase();
db.CreateColumnFamily("test_cf");
var cf = db.GetColumnFamily("test_cf")!;

int callCount = 0;

cf.SetCommitHook((ops, seq) =>
{
callCount++;
});

using (var txn = db.BeginTransaction())
{
txn.Put(cf, Encoding.UTF8.GetBytes("key1"), Encoding.UTF8.GetBytes("value1"));
txn.Commit();
}

Assert.Equal(1, callCount);

cf.ClearCommitHook();

using (var txn = db.BeginTransaction())
{
txn.Put(cf, Encoding.UTF8.GetBytes("key2"), Encoding.UTF8.GetBytes("value2"));
txn.Commit();
}

Assert.Equal(1, callCount);
}

[Fact]
public void CommitHook_SequenceNumberShouldIncrease()
{
using var db = OpenDatabase();
db.CreateColumnFamily("test_cf");
var cf = db.GetColumnFamily("test_cf")!;

var seqNumbers = new List<ulong>();

cf.SetCommitHook((ops, seq) =>
{
seqNumbers.Add(seq);
});

for (int i = 0; i < 3; i++)
{
using var txn = db.BeginTransaction();
txn.Put(cf, Encoding.UTF8.GetBytes($"key{i}"), Encoding.UTF8.GetBytes($"value{i}"));
txn.Commit();
}

Assert.Equal(3, seqNumbers.Count);
Assert.True(seqNumbers[1] > seqNumbers[0]);
Assert.True(seqNumbers[2] > seqNumbers[1]);
}

[Fact]
public void RangeCost_EmptyColumnFamily_ShouldReturnZero()
{
Expand Down
Loading