Skip to content

Commit 6d82233

Browse files
feat(storage): G21 P1 — storage.mode Topic-Config + Manifest-Skeleton
ADR-014 von Proposed -> Accepted. P1 liefert Config-Plumbing und die reine Type-Library; KEIN Write-Path-Change (kommt in P2/P3). Neue Bestandteile: - src/Kuestenlogik.Surgewave.Storage.Disaggregated/ (neues Projekt): StorageMode enum (Replicated/DisaggregatedWal/DisaggregatedStateless), StorageModeKeys Wire-String-Konstanten + Parse/ToWireString, StreamObjectRef record (ObjectKey + Offset-Range + Bytes + CreatedAt), PartitionManifest record mit Append-Append-Only-Invariante + Binary-Search-Locate, StorageModeValidator mit RF=1-Constraint und Embedded-Mode-Restriction. - TopicMetadata bekommt StorageModeRaw + IsDisaggregated computed properties (string-level, damit Core keine Disaggregated-Dependency bekommt). - Native-CreateTopic-Validator: rejection mit konkreter ADR-Begruendung bei unbekanntem storage.mode-Value oder RF>1 fuer disaggregated. - TopicCreate.razor in Control: MudSelect-Dropdown fuer storage.mode mit Helper-Text, Default replicated; Wert wandert in das Configs- Dictionary. TopicDetail rendert den Eintrag automatisch (existierender generischer Configs-Loop). - Slnx + Broker.csproj referenzieren das neue Projekt. Verifikation: Solution-Build sauber (0 Warnungen, 0 Fehler).
1 parent fa13377 commit 6d82233

13 files changed

Lines changed: 342 additions & 2 deletions

File tree

Kuestenlogik.Surgewave.slnx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<Project Path="src/Kuestenlogik.Surgewave.Storage.Engine.S3/Kuestenlogik.Surgewave.Storage.Engine.S3.csproj" />
3030
<Project Path="src/Kuestenlogik.Surgewave.Storage.Engine.Sqlite/Kuestenlogik.Surgewave.Storage.Engine.Sqlite.csproj" />
3131
<Project Path="src/Kuestenlogik.Surgewave.Storage.Tiering/Kuestenlogik.Surgewave.Storage.Tiering.csproj" />
32+
<Project Path="src/Kuestenlogik.Surgewave.Storage.Disaggregated/Kuestenlogik.Surgewave.Storage.Disaggregated.csproj" />
3233
<Project Path="src/Kuestenlogik.Surgewave.Clustering/Kuestenlogik.Surgewave.Clustering.csproj" />
3334
<Project Path="src/Kuestenlogik.Surgewave.Broker/Kuestenlogik.Surgewave.Broker.csproj" />
3435
<Project Path="src/Kuestenlogik.Surgewave.Runtime/Kuestenlogik.Surgewave.Runtime.csproj" />

docs/adr/014-disaggregated-storage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
| Status | Date |
44
|----------|------------|
5-
| Proposed | 2026-06-11 |
5+
| Accepted | 2026-06-11 |
66

77
## Context
88

docs/adr/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ ADRs document important architectural decisions, their context, and consequences
2121
| [ADR-011](011-community-enterprise-split.md) | Community/Enterprise Repository Split | Accepted | 2026-04 |
2222
| [ADR-012](012-zero-copy-high-performance.md) | Zero-Copy & High-Performance Patterns | Accepted | 2026-04 |
2323
| [ADR-013](013-control-ui-plugin-first.md) | Control UI Plugin-First Architecture | Accepted | 2026-05 |
24-
| [ADR-014](014-disaggregated-storage.md) | Disaggregated Compute/Storage — Two Modes | Proposed | 2026-06 |
24+
| [ADR-014](014-disaggregated-storage.md) | Disaggregated Compute/Storage — Two Modes | Accepted | 2026-06 |

src/Kuestenlogik.Surgewave.Broker/Kuestenlogik.Surgewave.Broker.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
<ProjectReference Include="..\Kuestenlogik.Surgewave.Storage.Engine.FileSystem\Kuestenlogik.Surgewave.Storage.Engine.FileSystem.csproj" />
123123
<ProjectReference Include="..\Kuestenlogik.Surgewave.Storage.Engine.Memory\Kuestenlogik.Surgewave.Storage.Engine.Memory.csproj" />
124124
<ProjectReference Include="..\Kuestenlogik.Surgewave.Storage.Tiering\Kuestenlogik.Surgewave.Storage.Tiering.csproj" />
125+
<ProjectReference Include="..\Kuestenlogik.Surgewave.Storage.Disaggregated\Kuestenlogik.Surgewave.Storage.Disaggregated.csproj" />
125126
<ProjectReference Include="..\Kuestenlogik.Surgewave.Clustering\Kuestenlogik.Surgewave.Clustering.csproj" />
126127
<ProjectReference Include="..\Kuestenlogik.Surgewave.Transport\Kuestenlogik.Surgewave.Transport.csproj" />
127128
<ProjectReference Include="..\Kuestenlogik.Surgewave.Transport.Tcp\Kuestenlogik.Surgewave.Transport.Tcp.csproj" />

src/Kuestenlogik.Surgewave.Broker/Native/Operations/Topics/TopicOperations.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Kuestenlogik.Surgewave.Protocol.Native;
55
using Kuestenlogik.Surgewave.Protocol.Native.Payloads;
66
using Kuestenlogik.Surgewave.Protocol.Native.Payloads.Topics;
7+
using Kuestenlogik.Surgewave.Storage.Disaggregated;
78

89
namespace Kuestenlogik.Surgewave.Broker.Native.Operations.Topics;
910

@@ -25,6 +26,43 @@ public void ValidateRequest(in CreateTopicRequestPayload request)
2526
{
2627
if (string.IsNullOrEmpty(request.Name))
2728
throw new SurgewaveOperationException(SurgewaveErrorCode.InvalidRequest, "Topic name required");
29+
30+
// ADR-014: validate storage.mode + the replication-factor invariant.
31+
// We resolve the mode here (string-level) before LogManager touches it
32+
// so the error reaches the client with a useful diagnostic instead of
33+
// a generic InvalidOperationException.
34+
if (request.Configs is { Length: > 0 })
35+
{
36+
foreach (var c in request.Configs)
37+
{
38+
if (c.Key != StorageModeKeys.ConfigKey) continue;
39+
StorageMode mode;
40+
try
41+
{
42+
mode = StorageModeKeys.Parse(c.Value);
43+
}
44+
catch (ArgumentException ex)
45+
{
46+
throw new SurgewaveOperationException(SurgewaveErrorCode.InvalidRequest, ex.Message);
47+
}
48+
try
49+
{
50+
// objectStoreConfigured + isEmbeddedRuntime stay conservatively
51+
// true/false here in P1 (no cluster-level introspection yet).
52+
// P2/P3 wire these to the actual broker config so the rejection
53+
// can also catch "disaggregated requested but no bucket configured".
54+
StorageModeValidator.Validate(
55+
mode,
56+
request.ReplicationFactor,
57+
objectStoreConfigured: true,
58+
isEmbeddedRuntime: false);
59+
}
60+
catch (StorageModeValidationException ex)
61+
{
62+
throw new SurgewaveOperationException(SurgewaveErrorCode.InvalidRequest, ex.Message);
63+
}
64+
}
65+
}
2866
}
2967

3068
public Task ExecuteAsync(CreateTopicRequestPayload request, CancellationToken cancellationToken)

src/Kuestenlogik.Surgewave.Control/Components/Pages/Topics/TopicCreate.razor

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@
7272
<MudTextField @bind-Value="_minInsyncReplicas" Label="min.insync.replicas" Variant="Variant.Outlined"
7373
HelperText="Minimum number of in-sync replicas" />
7474
</MudItem>
75+
<MudItem xs="12" md="6">
76+
<MudSelect @bind-Value="_storageMode" Label="storage.mode" Variant="Variant.Outlined"
77+
HelperText="Disaggregated modes bypass ISR; requires replication.factor=1 (ADR-014).">
78+
<MudSelectItem Value="@("replicated")">replicated (default — local segments + ISR)</MudSelectItem>
79+
<MudSelectItem Value="@("disaggregated-wal")">disaggregated-wal (sub-10 ms, WAL + S3-offload)</MudSelectItem>
80+
<MudSelectItem Value="@("disaggregated-stateless")">disaggregated-stateless (S3-direct, batch-cheap)</MudSelectItem>
81+
</MudSelect>
82+
</MudItem>
7583
</MudGrid>
7684
</MudExpansionPanel>
7785
</MudExpansionPanels>
@@ -110,6 +118,7 @@
110118
private string _compressionType = "producer";
111119
private string _segmentBytes = "";
112120
private string _minInsyncReplicas = "";
121+
private string _storageMode = "replicated";
113122

114123
private readonly List<BreadcrumbItem> _breadcrumbs =
115124
[
@@ -158,6 +167,8 @@
158167
configs["segment.bytes"] = _segmentBytes;
159168
if (!string.IsNullOrWhiteSpace(_minInsyncReplicas))
160169
configs["min.insync.replicas"] = _minInsyncReplicas;
170+
if (_storageMode != "replicated")
171+
configs["storage.mode"] = _storageMode;
161172

162173
var request = new CreateTopicRequest(
163174
_name,

src/Kuestenlogik.Surgewave.Core/Models/TopicMetadata.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ public sealed record TopicMetadata
4242
? ParseCleanupPolicy(policy)
4343
: CleanupPolicy.Delete;
4444

45+
/// <summary>
46+
/// Raw <c>storage.mode</c> string from config, or null if unset.
47+
/// The full enum-typed accessor lives in
48+
/// <c>Kuestenlogik.Surgewave.Storage.Disaggregated.StorageModeValidator.ResolveFromConfig</c>
49+
/// (kept out of Core to avoid pulling the disaggregated types into
50+
/// every consumer of <see cref="TopicMetadata"/>).
51+
/// </summary>
52+
public string? StorageModeRaw =>
53+
Config.TryGetValue("storage.mode", out var mode) ? mode : null;
54+
55+
/// <summary>
56+
/// Whether this topic opts into one of the disaggregated storage modes
57+
/// (<c>disaggregated-wal</c> or <c>disaggregated-stateless</c>). Used
58+
/// by code that must skip the ISR replication path without taking a
59+
/// dependency on the Disaggregated project.
60+
/// </summary>
61+
public bool IsDisaggregated =>
62+
StorageModeRaw is "disaggregated-wal" or "disaggregated-stateless";
63+
4564
private static CleanupPolicy ParseCleanupPolicy(string policy) =>
4665
policy.ToLowerInvariant() switch
4766
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<Description>Surgewave Disaggregated Storage — pure types and validation for the disaggregated-wal (AutoMQ-style) and disaggregated-stateless (WarpStream-style) topic modes. Hosts the StorageMode enum, the StreamObjectRef / PartitionManifest records, and the topic-config validator. The actual write paths live in the per-mode engine projects (P2, P3 in ADR-014).</Description>
5+
<PackageTags>surgewave;storage;disaggregated;s3</PackageTags>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<ProjectReference Include="..\Kuestenlogik.Surgewave.Core\Kuestenlogik.Surgewave.Core.csproj" />
10+
</ItemGroup>
11+
12+
</Project>
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using Kuestenlogik.Surgewave.Core.Models;
2+
3+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated;
4+
5+
/// <summary>
6+
/// Ordered list of <see cref="StreamObjectRef"/> for one partition,
7+
/// plus the partition identity. Persisted in the cluster metadata
8+
/// store (KRaft-backed today, see ADR-014). The list is kept sorted
9+
/// by <see cref="StreamObjectRef.FirstOffset"/> so a range query is
10+
/// a binary search.
11+
/// </summary>
12+
public sealed record PartitionManifest
13+
{
14+
public required TopicPartition Partition { get; init; }
15+
16+
/// <summary>Stream objects in increasing-offset order.</summary>
17+
public required IReadOnlyList<StreamObjectRef> Objects { get; init; }
18+
19+
/// <summary>
20+
/// Monotonic version counter — bumped on every manifest mutation.
21+
/// Used by the offset-commit protocol for optimistic concurrency
22+
/// (a stale committer is rejected so two parallel commits don't
23+
/// silently overwrite each other's refs).
24+
/// </summary>
25+
public required long Version { get; init; }
26+
27+
public static PartitionManifest Empty(TopicPartition partition) => new()
28+
{
29+
Partition = partition,
30+
Objects = [],
31+
Version = 0,
32+
};
33+
34+
/// <summary>
35+
/// Smallest offset still in the manifest, or null if the manifest is
36+
/// empty. Reads below this offset fall back to the WAL (for
37+
/// <c>disaggregated-wal</c>) or return <c>OFFSET_OUT_OF_RANGE</c>
38+
/// (for <c>disaggregated-stateless</c>).
39+
/// </summary>
40+
public long? FirstOffset => Objects.Count == 0 ? null : Objects[0].FirstOffset;
41+
42+
/// <summary>Largest offset stored remotely.</summary>
43+
public long? LastOffset => Objects.Count == 0 ? null : Objects[^1].LastOffset;
44+
45+
/// <summary>Find the stream object containing the given offset, or null.</summary>
46+
public StreamObjectRef? Locate(long offset)
47+
{
48+
var lo = 0;
49+
var hi = Objects.Count - 1;
50+
while (lo <= hi)
51+
{
52+
var mid = (lo + hi) >>> 1;
53+
var obj = Objects[mid];
54+
if (obj.Contains(offset)) return obj;
55+
if (offset < obj.FirstOffset) hi = mid - 1;
56+
else lo = mid + 1;
57+
}
58+
return null;
59+
}
60+
61+
/// <summary>
62+
/// Return a manifest with <paramref name="newObject"/> appended and
63+
/// <see cref="Version"/> bumped. Throws when the new ref's
64+
/// <see cref="StreamObjectRef.FirstOffset"/> overlaps the existing
65+
/// tail — disaggregated commits must be strictly monotonic.
66+
/// </summary>
67+
public PartitionManifest AppendObject(StreamObjectRef newObject)
68+
{
69+
if (Objects.Count > 0 && newObject.FirstOffset <= Objects[^1].LastOffset)
70+
{
71+
throw new InvalidOperationException(
72+
$"Stream object offset range [{newObject.FirstOffset}, {newObject.LastOffset}] "
73+
+ $"overlaps the manifest's existing tail (ends at {Objects[^1].LastOffset}). "
74+
+ "Disaggregated commits must be strictly monotonic.");
75+
}
76+
var next = new List<StreamObjectRef>(Objects.Count + 1);
77+
next.AddRange(Objects);
78+
next.Add(newObject);
79+
return this with { Objects = next, Version = Version + 1 };
80+
}
81+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace Kuestenlogik.Surgewave.Storage.Disaggregated;
2+
3+
/// <summary>
4+
/// Per-topic storage-mode selector introduced by ADR-014. Default is
5+
/// <see cref="Replicated"/> (the existing local-segment + ISR path).
6+
/// The two disaggregated modes opt the topic into an object-store-
7+
/// durable write path; the broker enforces <c>replication.factor=1</c>
8+
/// for those — S3 is the durability layer, replicating again would
9+
/// only burn money.
10+
/// </summary>
11+
public enum StorageMode
12+
{
13+
/// <summary>Existing replicated path (local segments + ISR). Default.</summary>
14+
Replicated,
15+
16+
/// <summary>
17+
/// AutoMQ-style: broker keeps a local WAL on EBS/NVMe so produce-ack
18+
/// stays sub-10 ms, then a background flusher packs sealed segments
19+
/// into S3 stream objects and appends them to the partition manifest.
20+
/// Embedded-friendly — the WAL works locally even without an object
21+
/// store configured.
22+
/// </summary>
23+
DisaggregatedWal,
24+
25+
/// <summary>
26+
/// WarpStream-style: incoming batches buffer in RAM on a stateless
27+
/// agent, the agent PUTs to S3 on size/time threshold, then commits
28+
/// the offset range to the manifest. No WAL. Produce-P99 is
29+
/// dominated by the S3 PUT (~400-600 ms). Not supported in embedded
30+
/// mode — needs an object store reachable at startup.
31+
/// </summary>
32+
DisaggregatedStateless,
33+
}

0 commit comments

Comments
 (0)