Skip to content

Commit c122caa

Browse files
feat(storage): G21 P2a — PartitionManifestStore + 16 Tests
Erste konkrete P2-Bauteil: persistente Manifest-Layer fuer disaggregated-Topics. KEIN Write-Path-Change, KEIN WAL-Flusher; das ist die Datenhaltung auf der P2b/c aufbauen. Neue Bestandteile: - IPartitionManifestStore: GetAsync/AppendObjectAsync/ListPartitionsAsync. Optimistic-Concurrency via PartitionManifest.Version, Overlap-Check via PartitionManifest.AppendObject. - InMemoryPartitionManifestStore: Dictionary + per-Partition-Semaphore. Verwendung in Tests + embedded Broker. - FilePartitionManifestStore: JSON-Datei pro Partition unter data/disaggregated/manifests/<topic>__<partition>.json. Writes atomar via temp+rename. Hydration on-first-use, re-validiert Overlap-Invariante beim Boot. Test-Coverage (16 Tests, alle passing): - PartitionManifest: empty-state, append+version-bump, overlap-throw, contiguous-allowed, binary-search-Locate, RecordCount-inclusive. - InMemory: round-trip, overlap-rejection, ListPartitions, concurrent- appends-to-different-partitions parallelisierbar. - File: write-creates-file, persistence-across-recreation, no-tmp- residue, hydrate-from-disk, FileNameFor-convention. InternalsVisibleTo fuer den Tests-Assembly, sonst Layer-Reinheit bleibt — Disaggregated haengt nur an Core, keine Cycle Richtung Broker.
1 parent 6d82233 commit c122caa

9 files changed

Lines changed: 512 additions & 0 deletions

Kuestenlogik.Surgewave.slnx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<Project Path="src/Kuestenlogik.Surgewave.Storage.Engine.Sqlite/Kuestenlogik.Surgewave.Storage.Engine.Sqlite.csproj" />
3131
<Project Path="src/Kuestenlogik.Surgewave.Storage.Tiering/Kuestenlogik.Surgewave.Storage.Tiering.csproj" />
3232
<Project Path="src/Kuestenlogik.Surgewave.Storage.Disaggregated/Kuestenlogik.Surgewave.Storage.Disaggregated.csproj" />
33+
<Project Path="tests/Kuestenlogik.Surgewave.Storage.Disaggregated.Tests/Kuestenlogik.Surgewave.Storage.Disaggregated.Tests.csproj" />
3334
<Project Path="src/Kuestenlogik.Surgewave.Clustering/Kuestenlogik.Surgewave.Clustering.csproj" />
3435
<Project Path="src/Kuestenlogik.Surgewave.Broker/Kuestenlogik.Surgewave.Broker.csproj" />
3536
<Project Path="src/Kuestenlogik.Surgewave.Runtime/Kuestenlogik.Surgewave.Runtime.csproj" />
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
using System.Collections.Concurrent;
2+
using System.Text.Json;
3+
using System.Text.Json.Serialization;
4+
using Kuestenlogik.Surgewave.Core.Models;
5+
6+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated;
7+
8+
/// <summary>
9+
/// Manifest store that persists one JSON file per partition under a
10+
/// configured data directory. Writes are atomic (temp file + rename) so
11+
/// a crash mid-write never corrupts the manifest on disk — the broker
12+
/// either sees the old version or the new one, never a half-written
13+
/// file.
14+
///
15+
/// File layout under <c>&lt;dataDir&gt;/disaggregated/manifests/</c>:
16+
/// <code>
17+
/// manifests/
18+
/// orders__0.json
19+
/// orders__1.json
20+
/// events__0.json
21+
/// </code>
22+
/// The double-underscore is deliberate — Kafka topic names allow dots
23+
/// but not underscores in our convention, so <c>__</c> can never appear
24+
/// in a topic name and is therefore a safe partition separator.
25+
///
26+
/// All reads go through an in-memory cache so hot-path reads of the
27+
/// manifest don't touch disk; writes go to disk first, then update the
28+
/// cache.
29+
/// </summary>
30+
public sealed class FilePartitionManifestStore : IPartitionManifestStore
31+
{
32+
private readonly string _root;
33+
private readonly InMemoryPartitionManifestStore _cache = new();
34+
private readonly ConcurrentDictionary<TopicPartition, SemaphoreSlim> _writeLocks = new();
35+
private bool _hydrated;
36+
private readonly SemaphoreSlim _hydrateLock = new(1, 1);
37+
38+
private static readonly JsonSerializerOptions JsonOptions = new()
39+
{
40+
WriteIndented = true,
41+
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
42+
};
43+
44+
public FilePartitionManifestStore(string dataDirectory)
45+
{
46+
ArgumentException.ThrowIfNullOrWhiteSpace(dataDirectory);
47+
_root = Path.Combine(Path.GetFullPath(dataDirectory), "disaggregated", "manifests");
48+
Directory.CreateDirectory(_root);
49+
}
50+
51+
public async ValueTask<PartitionManifest> GetAsync(TopicPartition partition, CancellationToken cancellationToken = default)
52+
{
53+
await EnsureHydratedAsync(cancellationToken).ConfigureAwait(false);
54+
return await _cache.GetAsync(partition, cancellationToken).ConfigureAwait(false);
55+
}
56+
57+
public async ValueTask<PartitionManifest> AppendObjectAsync(
58+
TopicPartition partition,
59+
StreamObjectRef newObject,
60+
CancellationToken cancellationToken = default)
61+
{
62+
await EnsureHydratedAsync(cancellationToken).ConfigureAwait(false);
63+
64+
var gate = _writeLocks.GetOrAdd(partition, _ => new SemaphoreSlim(1, 1));
65+
await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
66+
try
67+
{
68+
// Cache-Append performs the overlap check + version bump; that
69+
// result is the manifest we serialise to disk. Disk is the
70+
// durable side, but cache stays in lock-step so concurrent
71+
// readers don't see a manifest that exists on disk but not
72+
// in cache.
73+
var next = await _cache.AppendObjectAsync(partition, newObject, cancellationToken).ConfigureAwait(false);
74+
await WriteAtomicAsync(partition, next, cancellationToken).ConfigureAwait(false);
75+
return next;
76+
}
77+
finally
78+
{
79+
gate.Release();
80+
}
81+
}
82+
83+
public async ValueTask<IReadOnlyList<TopicPartition>> ListPartitionsAsync(CancellationToken cancellationToken = default)
84+
{
85+
await EnsureHydratedAsync(cancellationToken).ConfigureAwait(false);
86+
return await _cache.ListPartitionsAsync(cancellationToken).ConfigureAwait(false);
87+
}
88+
89+
private async ValueTask EnsureHydratedAsync(CancellationToken cancellationToken)
90+
{
91+
if (Volatile.Read(ref _hydrated)) return;
92+
await _hydrateLock.WaitAsync(cancellationToken).ConfigureAwait(false);
93+
try
94+
{
95+
if (_hydrated) return;
96+
foreach (var file in Directory.EnumerateFiles(_root, "*.json"))
97+
{
98+
var manifest = await ReadManifestAsync(file, cancellationToken).ConfigureAwait(false);
99+
if (manifest is null) continue;
100+
101+
// Replay each ref through the in-memory store's Append so
102+
// the overlap-check invariants are re-validated on every
103+
// boot. A corrupted on-disk manifest fails loudly here
104+
// instead of silently returning bad data on later reads.
105+
foreach (var obj in manifest.Objects)
106+
{
107+
await _cache.AppendObjectAsync(manifest.Partition, obj, cancellationToken).ConfigureAwait(false);
108+
}
109+
}
110+
Volatile.Write(ref _hydrated, true);
111+
}
112+
finally
113+
{
114+
_hydrateLock.Release();
115+
}
116+
}
117+
118+
private static async Task<PartitionManifest?> ReadManifestAsync(string path, CancellationToken cancellationToken)
119+
{
120+
await using var stream = File.OpenRead(path);
121+
return await JsonSerializer.DeserializeAsync<PartitionManifest>(stream, JsonOptions, cancellationToken).ConfigureAwait(false);
122+
}
123+
124+
private async Task WriteAtomicAsync(TopicPartition partition, PartitionManifest manifest, CancellationToken cancellationToken)
125+
{
126+
var finalPath = Path.Combine(_root, FileNameFor(partition));
127+
var tempPath = finalPath + ".tmp";
128+
await using (var stream = File.Create(tempPath))
129+
{
130+
await JsonSerializer.SerializeAsync(stream, manifest, JsonOptions, cancellationToken).ConfigureAwait(false);
131+
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
132+
}
133+
File.Move(tempPath, finalPath, overwrite: true);
134+
}
135+
136+
internal static string FileNameFor(TopicPartition partition) =>
137+
$"{partition.Topic}__{partition.Partition}.json";
138+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using Kuestenlogik.Surgewave.Core.Models;
2+
3+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated;
4+
5+
/// <summary>
6+
/// Persistence layer for <see cref="PartitionManifest"/>. One manifest
7+
/// per disaggregated topic-partition; the store is the single source of
8+
/// truth for "what offset ranges live in the object store right now".
9+
///
10+
/// Two implementations ship in P2a:
11+
/// - <see cref="InMemoryPartitionManifestStore"/> for tests + embedded broker.
12+
/// - <see cref="FilePartitionManifestStore"/> for standalone brokers
13+
/// (atomic JSON write per partition). KRaft-replicated variant is a
14+
/// later iteration, see ADR-014 §"Per-partition manifest".
15+
///
16+
/// Implementations must be safe for concurrent calls across different
17+
/// partitions; same-partition mutations are serialised by the store so
18+
/// callers get optimistic-concurrency semantics via
19+
/// <see cref="PartitionManifest.Version"/> instead of needing external
20+
/// locks.
21+
/// </summary>
22+
public interface IPartitionManifestStore
23+
{
24+
/// <summary>
25+
/// Read the current manifest for <paramref name="partition"/>. Returns
26+
/// an empty manifest (no stream objects, version 0) when the partition
27+
/// has never been written to.
28+
/// </summary>
29+
ValueTask<PartitionManifest> GetAsync(TopicPartition partition, CancellationToken cancellationToken = default);
30+
31+
/// <summary>
32+
/// Atomically append <paramref name="newObject"/> to the manifest. Bumps
33+
/// <see cref="PartitionManifest.Version"/> on success and returns the
34+
/// updated manifest. Throws <see cref="InvalidOperationException"/> if
35+
/// the new object's offset range overlaps the manifest tail
36+
/// (disaggregated commits must be strictly monotonic).
37+
/// </summary>
38+
ValueTask<PartitionManifest> AppendObjectAsync(
39+
TopicPartition partition,
40+
StreamObjectRef newObject,
41+
CancellationToken cancellationToken = default);
42+
43+
/// <summary>
44+
/// List every partition that has a manifest on this store. Used at
45+
/// broker startup to rehydrate the in-memory manifest cache (and by
46+
/// the cost-reporting endpoint to enumerate disaggregated storage
47+
/// usage).
48+
/// </summary>
49+
ValueTask<IReadOnlyList<TopicPartition>> ListPartitionsAsync(CancellationToken cancellationToken = default);
50+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System.Collections.Concurrent;
2+
using Kuestenlogik.Surgewave.Core.Models;
3+
4+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated;
5+
6+
/// <summary>
7+
/// In-process manifest store. Used in tests, in the embedded broker, and
8+
/// as the cache layer behind <see cref="FilePartitionManifestStore"/>. A
9+
/// <see cref="SemaphoreSlim"/> per partition serialises append calls
10+
/// without blocking unrelated partitions.
11+
/// </summary>
12+
public sealed class InMemoryPartitionManifestStore : IPartitionManifestStore
13+
{
14+
private readonly ConcurrentDictionary<TopicPartition, PartitionManifest> _manifests = new();
15+
private readonly ConcurrentDictionary<TopicPartition, SemaphoreSlim> _locks = new();
16+
17+
public ValueTask<PartitionManifest> GetAsync(TopicPartition partition, CancellationToken cancellationToken = default)
18+
{
19+
cancellationToken.ThrowIfCancellationRequested();
20+
var manifest = _manifests.TryGetValue(partition, out var existing)
21+
? existing
22+
: PartitionManifest.Empty(partition);
23+
return ValueTask.FromResult(manifest);
24+
}
25+
26+
public async ValueTask<PartitionManifest> AppendObjectAsync(
27+
TopicPartition partition,
28+
StreamObjectRef newObject,
29+
CancellationToken cancellationToken = default)
30+
{
31+
var gate = _locks.GetOrAdd(partition, _ => new SemaphoreSlim(1, 1));
32+
await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
33+
try
34+
{
35+
var current = _manifests.TryGetValue(partition, out var existing)
36+
? existing
37+
: PartitionManifest.Empty(partition);
38+
var next = current.AppendObject(newObject);
39+
_manifests[partition] = next;
40+
return next;
41+
}
42+
finally
43+
{
44+
gate.Release();
45+
}
46+
}
47+
48+
public ValueTask<IReadOnlyList<TopicPartition>> ListPartitionsAsync(CancellationToken cancellationToken = default)
49+
{
50+
cancellationToken.ThrowIfCancellationRequested();
51+
IReadOnlyList<TopicPartition> snapshot = _manifests.Keys.ToArray();
52+
return ValueTask.FromResult(snapshot);
53+
}
54+
}

src/Kuestenlogik.Surgewave.Storage.Disaggregated/Kuestenlogik.Surgewave.Storage.Disaggregated.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@
99
<ProjectReference Include="..\Kuestenlogik.Surgewave.Core\Kuestenlogik.Surgewave.Core.csproj" />
1010
</ItemGroup>
1111

12+
<ItemGroup>
13+
<InternalsVisibleTo Include="Kuestenlogik.Surgewave.Storage.Disaggregated.Tests" />
14+
</ItemGroup>
15+
1216
</Project>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
using Kuestenlogik.Surgewave.Core.Models;
2+
using Kuestenlogik.Surgewave.Storage.Disaggregated;
3+
using Xunit;
4+
5+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated.Tests;
6+
7+
/// <summary>
8+
/// File-backed store tests use a per-test temp dir so suites can run
9+
/// in parallel without colliding on disk.
10+
/// </summary>
11+
public sealed class FilePartitionManifestStoreTests : IDisposable
12+
{
13+
private readonly string _tempDir;
14+
15+
public FilePartitionManifestStoreTests()
16+
{
17+
_tempDir = Path.Combine(Path.GetTempPath(), "swv-mfst-tests-" + Guid.NewGuid().ToString("N"));
18+
Directory.CreateDirectory(_tempDir);
19+
}
20+
21+
public void Dispose()
22+
{
23+
if (Directory.Exists(_tempDir)) Directory.Delete(_tempDir, recursive: true);
24+
}
25+
26+
private static readonly TopicPartition P0 = new() { Topic = "orders", Partition = 0 };
27+
28+
[Fact]
29+
public async Task Append_writes_json_file_at_expected_path()
30+
{
31+
var store = new FilePartitionManifestStore(_tempDir);
32+
await store.AppendObjectAsync(P0, new StreamObjectRef("k", 0, 99, 1024, DateTime.UtcNow));
33+
34+
var expected = Path.Combine(_tempDir, "disaggregated", "manifests", "orders__0.json");
35+
Assert.True(File.Exists(expected));
36+
}
37+
38+
[Fact]
39+
public async Task Manifest_survives_store_recreation()
40+
{
41+
var s1 = new FilePartitionManifestStore(_tempDir);
42+
var first = new StreamObjectRef("k0", 0, 99, 1024, new DateTime(2026, 6, 11, 0, 0, 0, DateTimeKind.Utc));
43+
var second = new StreamObjectRef("k1", 100, 199, 1024, new DateTime(2026, 6, 11, 0, 1, 0, DateTimeKind.Utc));
44+
await s1.AppendObjectAsync(P0, first);
45+
await s1.AppendObjectAsync(P0, second);
46+
47+
// Brand-new store instance — must re-hydrate from disk on first call.
48+
var s2 = new FilePartitionManifestStore(_tempDir);
49+
var reloaded = await s2.GetAsync(P0);
50+
51+
Assert.Equal(2, reloaded.Version);
52+
Assert.Equal("k0", reloaded.Objects[0].ObjectKey);
53+
Assert.Equal("k1", reloaded.Objects[1].ObjectKey);
54+
Assert.Equal(0, reloaded.FirstOffset);
55+
Assert.Equal(199, reloaded.LastOffset);
56+
}
57+
58+
[Fact]
59+
public async Task No_temp_file_remains_after_successful_append()
60+
{
61+
var store = new FilePartitionManifestStore(_tempDir);
62+
await store.AppendObjectAsync(P0, new StreamObjectRef("k", 0, 99, 1024, DateTime.UtcNow));
63+
64+
var manifestDir = Path.Combine(_tempDir, "disaggregated", "manifests");
65+
var lingering = Directory.GetFiles(manifestDir, "*.tmp");
66+
Assert.Empty(lingering);
67+
}
68+
69+
[Fact]
70+
public async Task ListPartitions_returns_partitions_loaded_from_disk()
71+
{
72+
var s1 = new FilePartitionManifestStore(_tempDir);
73+
await s1.AppendObjectAsync(P0, new StreamObjectRef("k", 0, 99, 1024, DateTime.UtcNow));
74+
await s1.AppendObjectAsync(new TopicPartition { Topic = "orders", Partition = 1 },
75+
new StreamObjectRef("k", 0, 99, 1024, DateTime.UtcNow));
76+
77+
var s2 = new FilePartitionManifestStore(_tempDir);
78+
var partitions = await s2.ListPartitionsAsync();
79+
80+
Assert.Equal(2, partitions.Count);
81+
}
82+
83+
[Fact]
84+
public void FileNameFor_uses_double_underscore_separator()
85+
{
86+
var name = FilePartitionManifestStore.FileNameFor(new TopicPartition { Topic = "my.topic", Partition = 7 });
87+
Assert.Equal("my.topic__7.json", name);
88+
}
89+
}

0 commit comments

Comments
 (0)