Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions src/TidesDB/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public sealed class Config
/// </summary>
public ulong UnifiedMemtableSyncIntervalUs { get; init; } = 0;

/// <summary>
/// Object store behavior configuration (null = object store disabled).
/// Setting this automatically enables unified memtable mode.
/// </summary>
public ObjectStoreConfig? ObjectStoreConfig { get; init; }

/// <summary>
/// Creates a default configuration with the specified database path.
/// </summary>
Expand Down Expand Up @@ -235,3 +241,163 @@ public sealed class ColumnFamilyConfig
/// </summary>
public static ColumnFamilyConfig Default => new();
}

/// <summary>
/// Configuration for object store mode behavior.
/// </summary>
public sealed class ObjectStoreConfig
{
/// <summary>
/// Object store connector type (default: Filesystem).
/// </summary>
public ObjectStoreConnectorType ConnectorType { get; init; } = ObjectStoreConnectorType.Filesystem;

/// <summary>
/// Root directory for the filesystem connector.
/// Required when ConnectorType is Filesystem.
/// </summary>
public string? FsRootDir { get; init; }

/// <summary>
/// S3 endpoint (e.g., "s3.amazonaws.com" or "localhost:9000" for MinIO).
/// Required when ConnectorType is S3.
/// </summary>
public string? S3Endpoint { get; init; }

/// <summary>
/// S3 bucket name. Required when ConnectorType is S3.
/// </summary>
public string? S3Bucket { get; init; }

/// <summary>
/// S3 key prefix (optional, e.g., "production/db1/").
/// </summary>
public string? S3KeyPrefix { get; init; }

/// <summary>
/// S3 access key. Required when ConnectorType is S3.
/// </summary>
public string? S3AccessKey { get; init; }

/// <summary>
/// S3 secret key. Required when ConnectorType is S3.
/// </summary>
public string? S3SecretKey { get; init; }

/// <summary>
/// S3 region (e.g., "us-east-1"). Can be null for MinIO.
/// </summary>
public string? S3Region { get; init; }

/// <summary>
/// Use SSL (HTTPS) for S3 connections (default: true).
/// </summary>
public bool S3UseSsl { get; init; } = true;

/// <summary>
/// Use path-style URLs for S3 (default: false for AWS, set true for MinIO).
/// </summary>
public bool S3UsePathStyle { get; init; } = false;

/// <summary>
/// Local directory for cached SSTable files (null = use db_path).
/// </summary>
public string? LocalCachePath { get; init; }

/// <summary>
/// Maximum local cache size in bytes (default: 0 = unlimited).
/// </summary>
public ulong LocalCacheMaxBytes { get; init; } = 0;

/// <summary>
/// Cache downloaded files locally (default: true).
/// </summary>
public bool CacheOnRead { get; init; } = true;

/// <summary>
/// Keep local copy after upload (default: true).
/// </summary>
public bool CacheOnWrite { get; init; } = true;

/// <summary>
/// Number of parallel upload threads (default: 4).
/// </summary>
public int MaxConcurrentUploads { get; init; } = 4;

/// <summary>
/// Number of parallel download threads (default: 8).
/// </summary>
public int MaxConcurrentDownloads { get; init; } = 8;

/// <summary>
/// Use multipart upload above this size in bytes (default: 64MB).
/// </summary>
public ulong MultipartThreshold { get; init; } = 64 * 1024 * 1024;

/// <summary>
/// Chunk size for multipart uploads in bytes (default: 8MB).
/// </summary>
public ulong MultipartPartSize { get; init; } = 8 * 1024 * 1024;

/// <summary>
/// Upload MANIFEST after each compaction (default: true).
/// </summary>
public bool SyncManifestToObject { get; init; } = true;

/// <summary>
/// Upload closed WAL segments for replication (default: true).
/// </summary>
public bool ReplicateWal { get; init; } = true;

/// <summary>
/// Block flush until WAL is uploaded (default: false = background upload).
/// </summary>
public bool WalUploadSync { get; init; } = false;

/// <summary>
/// Sync active WAL when it grows by this many bytes (default: 1MB, 0 = off).
/// </summary>
public ulong WalSyncThresholdBytes { get; init; } = 1024 * 1024;

/// <summary>
/// Upload WAL after every txn commit for RPO=0 replication (default: false).
/// </summary>
public bool WalSyncOnCommit { get; init; } = false;

/// <summary>
/// Enable read-only replica mode (default: false).
/// </summary>
public bool ReplicaMode { get; init; } = false;

/// <summary>
/// MANIFEST poll interval for replica sync in microseconds (default: 5000000 = 5s).
/// </summary>
public ulong ReplicaSyncIntervalUs { get; init; } = 5_000_000;

/// <summary>
/// Replay WAL from object store for near-real-time reads on replicas (default: true).
/// </summary>
public bool ReplicaReplayWal { get; init; } = true;

/// <summary>
/// Creates a default object store configuration.
/// </summary>
public static ObjectStoreConfig Default => new();
}

/// <summary>
/// Object store connector types.
/// </summary>
public enum ObjectStoreConnectorType
{
/// <summary>
/// Filesystem-backed object store (for testing and local replication).
/// </summary>
Filesystem = 0,

/// <summary>
/// S3-compatible object store (AWS S3, MinIO, GCS with S3 compatibility).
/// Requires TidesDB built with -DTIDESDB_WITH_S3=ON.
/// </summary>
S3 = 1
}
11 changes: 11 additions & 0 deletions src/TidesDB/Native/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,15 @@ internal static partial class NativeMethods
// Memory operations
[LibraryImport(LibraryName, EntryPoint = "tidesdb_free")]
internal static partial void tidesdb_free(nint ptr);

// Object store operations
[LibraryImport(LibraryName, EntryPoint = "tidesdb_objstore_fs_create", StringMarshalling = StringMarshalling.Utf8)]
internal static partial nint tidesdb_objstore_fs_create(string rootDir);

[LibraryImport(LibraryName, EntryPoint = "tidesdb_objstore_default_config")]
internal static partial NativeObjStoreConfig tidesdb_objstore_default_config();

// Replica promotion
[LibraryImport(LibraryName, EntryPoint = "tidesdb_promote_to_primary")]
internal static partial int tidesdb_promote_to_primary(nint db);
}
21 changes: 21 additions & 0 deletions src/TidesDB/Native/NativeStructs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ internal struct NativeConfig
public nint ObjectStoreConfig;
}

[StructLayout(LayoutKind.Sequential)]
internal struct NativeObjStoreConfig
{
public nint LocalCachePath;
public nuint LocalCacheMaxBytes;
public int CacheOnRead;
public int CacheOnWrite;
public int MaxConcurrentUploads;
public int MaxConcurrentDownloads;
public nuint MultipartThreshold;
public nuint MultipartPartSize;
public int SyncManifestToObject;
public int ReplicateWal;
public int WalUploadSync;
public nuint WalSyncThresholdBytes;
public int WalSyncOnCommit;
public int ReplicaMode;
public ulong ReplicaSyncIntervalUs;
public int ReplicaReplayWal;
}

[StructLayout(LayoutKind.Sequential)]
internal unsafe struct NativeColumnFamilyConfig
{
Expand Down
134 changes: 108 additions & 26 deletions src/TidesDB/TidesDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ public sealed class TidesDb : IDisposable
private nint _handle;
private bool _disposed;
private nint _dbPathPtr;
private nint _objStorePtr;
private nint _objStoreConfigPtr;
private nint _localCachePathPtr;

private TidesDb(nint handle, nint dbPathPtr)
private TidesDb(nint handle, nint dbPathPtr, nint objStorePtr, nint objStoreConfigPtr, nint localCachePathPtr)
{
_handle = handle;
_dbPathPtr = dbPathPtr;
_objStorePtr = objStorePtr;
_objStoreConfigPtr = objStoreConfigPtr;
_localCachePathPtr = localCachePathPtr;
}

/// <summary>
Expand All @@ -43,36 +49,89 @@ private TidesDb(nint handle, nint dbPathPtr)
public static TidesDb Open(Config config)
{
var dbPathPtr = Marshal.StringToHGlobalAnsi(config.DbPath);

var nativeConfig = new NativeConfig
var objStorePtr = nint.Zero;
var objStoreConfigPtr = nint.Zero;
var localCachePathPtr = nint.Zero;

try
{
DbPath = dbPathPtr,
NumFlushThreads = config.NumFlushThreads,
NumCompactionThreads = config.NumCompactionThreads,
LogLevel = (int)config.LogLevel,
BlockCacheSize = (nuint)config.BlockCacheSize,
MaxOpenSstables = (nuint)config.MaxOpenSstables,
LogToFile = config.LogToFile ? 1 : 0,
LogTruncationAt = (nuint)config.LogTruncationAt,
MaxMemoryUsage = (nuint)config.MaxMemoryUsage,
UnifiedMemtable = config.UnifiedMemtable ? 1 : 0,
UnifiedMemtableWriteBufferSize = (nuint)config.UnifiedMemtableWriteBufferSize,
UnifiedMemtableSkipListMaxLevel = config.UnifiedMemtableSkipListMaxLevel,
UnifiedMemtableSkipListProbability = config.UnifiedMemtableSkipListProbability,
UnifiedMemtableSyncMode = (int)config.UnifiedMemtableSyncMode,
UnifiedMemtableSyncIntervalUs = config.UnifiedMemtableSyncIntervalUs,
ObjectStore = nint.Zero,
ObjectStoreConfig = nint.Zero
};
if (config.ObjectStoreConfig is { } osCfg)
{
objStorePtr = osCfg.ConnectorType switch
{
ObjectStoreConnectorType.Filesystem =>
NativeMethods.tidesdb_objstore_fs_create(
osCfg.FsRootDir ?? throw new ArgumentException("FsRootDir is required for filesystem connector")),
ObjectStoreConnectorType.S3 =>
throw new NotSupportedException("S3 connector requires native tidesdb_objstore_s3_create which is not yet exposed in the C# binding. Use the C API directly or contribute S3 support."),
_ => throw new ArgumentException($"Unknown connector type: {osCfg.ConnectorType}")
};

if (objStorePtr == nint.Zero)
throw new TidesDBException(-1, "failed to create object store connector");

var nativeOsCfg = NativeMethods.tidesdb_objstore_default_config();

if (osCfg.LocalCachePath != null)
{
localCachePathPtr = Marshal.StringToHGlobalAnsi(osCfg.LocalCachePath);
nativeOsCfg.LocalCachePath = localCachePathPtr;
}

nativeOsCfg.LocalCacheMaxBytes = (nuint)osCfg.LocalCacheMaxBytes;
nativeOsCfg.CacheOnRead = osCfg.CacheOnRead ? 1 : 0;
nativeOsCfg.CacheOnWrite = osCfg.CacheOnWrite ? 1 : 0;
nativeOsCfg.MaxConcurrentUploads = osCfg.MaxConcurrentUploads;
nativeOsCfg.MaxConcurrentDownloads = osCfg.MaxConcurrentDownloads;
nativeOsCfg.MultipartThreshold = (nuint)osCfg.MultipartThreshold;
nativeOsCfg.MultipartPartSize = (nuint)osCfg.MultipartPartSize;
nativeOsCfg.SyncManifestToObject = osCfg.SyncManifestToObject ? 1 : 0;
nativeOsCfg.ReplicateWal = osCfg.ReplicateWal ? 1 : 0;
nativeOsCfg.WalUploadSync = osCfg.WalUploadSync ? 1 : 0;
nativeOsCfg.WalSyncThresholdBytes = (nuint)osCfg.WalSyncThresholdBytes;
nativeOsCfg.WalSyncOnCommit = osCfg.WalSyncOnCommit ? 1 : 0;
nativeOsCfg.ReplicaMode = osCfg.ReplicaMode ? 1 : 0;
nativeOsCfg.ReplicaSyncIntervalUs = osCfg.ReplicaSyncIntervalUs;
nativeOsCfg.ReplicaReplayWal = osCfg.ReplicaReplayWal ? 1 : 0;

objStoreConfigPtr = Marshal.AllocHGlobal(Marshal.SizeOf<NativeObjStoreConfig>());
Marshal.StructureToPtr(nativeOsCfg, objStoreConfigPtr, false);
}

var result = NativeMethods.tidesdb_open(ref nativeConfig, out var dbHandle);
if (result != 0)
var nativeConfig = new NativeConfig
{
DbPath = dbPathPtr,
NumFlushThreads = config.NumFlushThreads,
NumCompactionThreads = config.NumCompactionThreads,
LogLevel = (int)config.LogLevel,
BlockCacheSize = (nuint)config.BlockCacheSize,
MaxOpenSstables = (nuint)config.MaxOpenSstables,
LogToFile = config.LogToFile ? 1 : 0,
LogTruncationAt = (nuint)config.LogTruncationAt,
MaxMemoryUsage = (nuint)config.MaxMemoryUsage,
UnifiedMemtable = config.UnifiedMemtable ? 1 : 0,
UnifiedMemtableWriteBufferSize = (nuint)config.UnifiedMemtableWriteBufferSize,
UnifiedMemtableSkipListMaxLevel = config.UnifiedMemtableSkipListMaxLevel,
UnifiedMemtableSkipListProbability = config.UnifiedMemtableSkipListProbability,
UnifiedMemtableSyncMode = (int)config.UnifiedMemtableSyncMode,
UnifiedMemtableSyncIntervalUs = config.UnifiedMemtableSyncIntervalUs,
ObjectStore = objStorePtr,
ObjectStoreConfig = objStoreConfigPtr
};

var result = NativeMethods.tidesdb_open(ref nativeConfig, out var dbHandle);
if (result != 0)
throw new TidesDBException(result, "failed to open database");

return new TidesDb(dbHandle, dbPathPtr, objStorePtr, objStoreConfigPtr, localCachePathPtr);
}
catch
{
Marshal.FreeHGlobal(dbPathPtr);
throw new TidesDBException(result, "failed to open database");
if (localCachePathPtr != nint.Zero) Marshal.FreeHGlobal(localCachePathPtr);
if (objStoreConfigPtr != nint.Zero) Marshal.FreeHGlobal(objStoreConfigPtr);
throw;
}

return new TidesDb(dbHandle, dbPathPtr);
}

/// <summary>
Expand Down Expand Up @@ -248,6 +307,17 @@ public bool GetComparator(string name)
return result == 0;
}

/// <summary>
/// Promotes a read-only replica to primary mode, enabling writes.
/// Performs a final MANIFEST sync and WAL replay before switching.
/// </summary>
public void PromoteToPrimary()
{
ThrowIfDisposed();
var result = NativeMethods.tidesdb_promote_to_primary(_handle);
TidesDBException.ThrowIfError(result, "failed to promote to primary");
}

/// <summary>
/// Forces a synchronous flush and aggressive compaction for all column families,
/// then drains both the global flush and compaction queues. Blocks until complete.
Expand Down Expand Up @@ -401,5 +471,17 @@ public void Dispose()
Marshal.FreeHGlobal(_dbPathPtr);
_dbPathPtr = nint.Zero;
}

if (_localCachePathPtr != nint.Zero)
{
Marshal.FreeHGlobal(_localCachePathPtr);
_localCachePathPtr = nint.Zero;
}

if (_objStoreConfigPtr != nint.Zero)
{
Marshal.FreeHGlobal(_objStoreConfigPtr);
_objStoreConfigPtr = nint.Zero;
}
}
}
2 changes: 1 addition & 1 deletion src/TidesDB/TidesDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<!-- Package metadata -->
<PackageId>TidesDB</PackageId>
<Version>0.4.6</Version>
<Version>0.4.7</Version>
<Authors>TidesDB</Authors>
<Company>TidesDB</Company>
<Description>Official C# bindings for TidesDB - A high-performance embedded key-value storage engine</Description>
Expand Down
Loading
Loading