Skip to content

Commit 7873d93

Browse files
feat(broker): G21-Followup C+D — WAL-Trim + Read-Fallback
Schliesst die dritte Closing-Bedingung aus #11 und ihren impliziten Read-Path-Pre-Requirement. Operatoren koennen jetzt TrimAfterFlush=true sicher aktivieren, weil getrimmte Offsets ueber DisaggregatedSegmentReader aus dem Object-Store nachgefuettert werden. Geaendert (Followup D — Read-Fallback): - DataApiHandler ctor +DisaggregatedSegmentReader? disaggregatedReader. HandleFetchAsync konsultiert vor dem LogManager.ReadBatches*-Call: wenn _disaggregatedReader != null && topicMetadata.IsDisaggregated && reader.TryReadAsync.HitManifest => Bytes aus Manifest+Remote zurueck, Skip der lokalen Read-Pfade. messageCount via batch-header-walk an Position 57. Sonst Fall-through zur existierenden Logik — Null-Default haelt Pre-G21-Verhalten 1:1. - SurgewaveRuntimeOptions +DisaggregatedReader Property. - SurgewaveRuntimeBuilder +WithDisaggregatedReader builder method. - SurgewaveRuntime passt _options.DisaggregatedReader an DataApiHandler durch. Geaendert (Followup C — Trim): - WalSealedSegment +TrimAsync optional delegate (Func<CT, Task>). - WalFlusherOptions +TrimAfterFlush flag (default false fuer Safety — Aktivieren ohne Read-Fallback wuerde getrimmte Offsets unreachable machen; Doc-Comment weist auf WithDisaggregatedReader hin). - WalFlusher.FlushOneAsync ruft TrimAsync NACH erfolgreichem AppendObjectAsync. Trim-Exception wird geloggt + geswallowed (Manifest = Source-of-Truth, Stray-Local-File ist kostenlos bis zur naechsten Retention-Sweep). - PartitionLogWalSegmentSource setzt TrimAsync auf File.Delete der 3 Segment-Files (.log, .index, .timeindex; idempotent ueber DeleteIfExists-Helper). Tests (4 neue, 57 gesamt, alle gruen): - TrimAfterFlush=true + delegate set -> trimmed=true - TrimAfterFlush=false + delegate set -> trimmed=false (knob respected) - Failed upload -> TrimAsync NICHT aufgerufen (atomic-or-nothing) - Trim-Exception nach erfolgreicher Manifest-Commit -> swallowed, Manifest bleibt 1 entry Verifikation: Solution-Build clean, Disaggregated 57/57 + Protocol.Native 153/153 + Broker baut. Followups A+B+C+D = alle 3 Closing-Bedingungen aus #11 durch — Issue kann jetzt geclosed werden.
1 parent 81f48f4 commit 7873d93

9 files changed

Lines changed: 218 additions & 9 deletions

File tree

src/Kuestenlogik.Surgewave.Broker/Handlers/DataApiHandler.cs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Kuestenlogik.Surgewave.Core.Util;
1212
using Kuestenlogik.Surgewave.Protocol.Kafka;
1313
using Kuestenlogik.Surgewave.Protocol.Kafka.Requests;
14+
using Kuestenlogik.Surgewave.Storage.Disaggregated.Read;
1415
using Kuestenlogik.Surgewave.Storage.Disaggregated.Routing;
1516
using Microsoft.Extensions.Logging;
1617

@@ -36,6 +37,7 @@ public sealed class DataApiHandler : IKafkaRequestHandler
3637
private readonly IRecordTransformPipeline? _recordTransform;
3738
private readonly ColdStartWorkloadProfiler? _coldStartProfiler;
3839
private readonly IPartitionAppender _partitionAppender;
40+
private readonly DisaggregatedSegmentReader? _disaggregatedReader;
3941
private readonly ILogger<DataApiHandler> _logger;
4042

4143
public IEnumerable<ApiKey> SupportedApiKeys =>
@@ -61,7 +63,8 @@ public DataApiHandler(
6163
SurgewaveBrokerObservability? observability = null,
6264
IRecordTransformPipeline? recordTransform = null,
6365
ColdStartWorkloadProfiler? coldStartProfiler = null,
64-
IPartitionAppender? partitionAppender = null)
66+
IPartitionAppender? partitionAppender = null,
67+
DisaggregatedSegmentReader? disaggregatedReader = null)
6568
{
6669
_config = config;
6770
_logManager = logManager;
@@ -82,7 +85,8 @@ public DataApiHandler(
8285
// enable disaggregated storage pass a RoutingPartitionAppender via
8386
// SurgewaveRuntimeBuilder.WithPartitionAppender(...).
8487
_partitionAppender = partitionAppender
85-
?? new DelegatingPartitionAppender((tp, batch, _, ct) => _logManager.AppendBatchAsync(tp, batch, ct));
88+
?? new DelegatingPartitionAppender((tp, batch, _, ct) => _logManager.AppendBatchAsync(tp, batch, ct).AsTask());
89+
_disaggregatedReader = disaggregatedReader;
8690
}
8791

8892
public async Task<KafkaResponse> HandleAsync(KafkaRequest request, RequestContext context, CancellationToken cancellationToken)
@@ -270,9 +274,9 @@ private async Task<ProduceResponse> HandleProduceAsync(ProduceRequest request, C
270274
// StatelessAgent. The record count is parsed from the batch
271275
// header (Kafka RecordBatch v2, offset 57); stateless mode
272276
// needs it for offset assignment.
273-
var recordCount = RecordHeaderParser.ParseBatchHeader(recordsToAppend.Span).RecordCount;
277+
var produceRecordCount = RecordHeaderParser.ParseBatchHeader(recordsToAppend.Span).RecordCount;
274278
var baseOffset = await _partitionAppender.AppendBatchAsync(
275-
topicPartition, recordsToAppend, recordCount, cancellationToken);
279+
topicPartition, recordsToAppend, produceRecordCount, cancellationToken);
276280

277281
// Register hash after successful write (deduplication)
278282
_deduplicationManager?.Register(topicPartition, recordsToAppend.Span, baseOffset);
@@ -483,6 +487,52 @@ private async Task<FetchResponse> HandleFetchAsync(FetchRequest request, Connect
483487
byte[] recordSet;
484488
int messageCount;
485489

490+
// Disaggregated read fallback: when the topic uses
491+
// disaggregated storage and the requested offset has
492+
// already been flushed to the object store (i.e. the
493+
// local WAL no longer holds it), serve from the
494+
// manifest. The reader returns HitManifest=false for
495+
// offsets past the manifest tail — those still live in
496+
// the local WAL and the normal read path below picks
497+
// them up. Skip when no reader is wired (default) or
498+
// when the topic isn't disaggregated.
499+
var fetchTopicMetadata = _logManager.GetTopicMetadata(topic);
500+
if (_disaggregatedReader is not null && fetchTopicMetadata?.IsDisaggregated == true)
501+
{
502+
var disagRead = await _disaggregatedReader.TryReadAsync(
503+
topicPartition,
504+
partitionData.FetchOffset,
505+
partitionData.MaxBytes,
506+
cancellationToken).ConfigureAwait(false);
507+
if (disagRead.HitManifest)
508+
{
509+
recordSet = disagRead.LogBytes.ToArray();
510+
messageCount = 0;
511+
// Same record-count tallying pattern as the
512+
// contiguous fast path: walk the concatenated
513+
// batches and read the count field at offset 57.
514+
var span = disagRead.LogBytes.Span;
515+
var cursor = 0;
516+
while (cursor + 61 <= span.Length)
517+
{
518+
var batchLen = System.Buffers.Binary.BinaryPrimitives.ReadInt32BigEndian(span.Slice(cursor + 8, 4));
519+
var batchTotal = 12 + batchLen; // baseOffset(8) + batchLength(4) + body
520+
if (cursor + 57 + 4 <= span.Length)
521+
messageCount += System.Buffers.Binary.BinaryPrimitives.ReadInt32BigEndian(span.Slice(cursor + 57, 4));
522+
cursor += batchTotal;
523+
}
524+
525+
partitionResponses.Add(new FetchResponse.PartitionResponse
526+
{
527+
Partition = partitionData.Partition,
528+
ErrorCode = ErrorCode.None,
529+
HighWatermark = highWatermark,
530+
RecordSet = recordSet,
531+
});
532+
continue;
533+
}
534+
}
535+
486536
if (!needsFiltering)
487537
{
488538
// Fast path: contiguous read — single allocation for all batches.

src/Kuestenlogik.Surgewave.Runtime/SurgewaveRuntime.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ private async Task StartInternalAsync(CancellationToken cancellationToken)
290290
_metadataApiHandler = new MetadataApiHandler(config, _logManager, metadataApiLogger);
291291
IKafkaRequestHandler[] handlers =
292292
[
293-
new DataApiHandler(config, _logManager, transactionCoordinator, _quotaManager, recordBatchSerializer, aclAuthorizer: null, deduplicationManager: null, delayIndex: null, ttlIndex: null, _metrics, dataApiLogger, partitionAppender: _options.PartitionAppender),
293+
new DataApiHandler(config, _logManager, transactionCoordinator, _quotaManager, recordBatchSerializer, aclAuthorizer: null, deduplicationManager: null, delayIndex: null, ttlIndex: null, _metrics, dataApiLogger, partitionAppender: _options.PartitionAppender, disaggregatedReader: _options.DisaggregatedReader),
294294
_metadataApiHandler,
295295
_topicAdminHandler,
296296
new ConfigApiHandler(config, dynamicBrokerConfig, _logManager),

src/Kuestenlogik.Surgewave.Runtime/SurgewaveRuntimeBuilder.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Kuestenlogik.Surgewave.Broker;
22
using Kuestenlogik.Surgewave.Core.Storage;
3+
using Kuestenlogik.Surgewave.Storage.Disaggregated.Read;
34
using Kuestenlogik.Surgewave.Storage.Disaggregated.Routing;
45
using Microsoft.Extensions.Logging;
56

@@ -46,6 +47,7 @@ public sealed class SurgewaveRuntimeBuilder
4647
private bool _cleanupOnDispose = true;
4748
private ILoggerFactory? _loggerFactory;
4849
private IPartitionAppender? _partitionAppender;
50+
private DisaggregatedSegmentReader? _disaggregatedReader;
4951

5052
internal SurgewaveRuntimeBuilder() { }
5153

@@ -363,6 +365,20 @@ public SurgewaveRuntimeBuilder WithPartitionAppender(IPartitionAppender partitio
363365
return this;
364366
}
365367

368+
/// <summary>
369+
/// Installs a disaggregated read fallback for the broker's Fetch path
370+
/// (ADR-014, G21). When set, fetches against disaggregated topics
371+
/// consult the partition manifest first and serve trimmed offsets from
372+
/// the object store before falling back to the local WAL. Required
373+
/// alongside <c>WalFlusherOptions.TrimAfterFlush</c> — without the
374+
/// reader, trimmed offsets would be unreachable.
375+
/// </summary>
376+
public SurgewaveRuntimeBuilder WithDisaggregatedReader(DisaggregatedSegmentReader reader)
377+
{
378+
_disaggregatedReader = reader;
379+
return this;
380+
}
381+
366382
// ==================== Configuration ====================
367383

368384
/// <summary>
@@ -424,5 +440,6 @@ public SurgewaveRuntimeBuilder ConfigureFrom(BrokerConfig config)
424440
CleanupOnDispose = _cleanupOnDispose,
425441
LoggerFactory = _loggerFactory,
426442
PartitionAppender = _partitionAppender,
443+
DisaggregatedReader = _disaggregatedReader,
427444
};
428445
}

src/Kuestenlogik.Surgewave.Runtime/SurgewaveRuntimeOptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Kuestenlogik.Surgewave.Broker;
22
using Kuestenlogik.Surgewave.Core.Storage;
3+
using Kuestenlogik.Surgewave.Storage.Disaggregated.Read;
34
using Kuestenlogik.Surgewave.Storage.Disaggregated.Routing;
45
using Microsoft.Extensions.Logging;
56

@@ -198,6 +199,16 @@ public sealed record SurgewaveRuntimeOptions
198199
/// </summary>
199200
public IPartitionAppender? PartitionAppender { get; init; }
200201

202+
/// <summary>
203+
/// Optional <see cref="DisaggregatedSegmentReader"/> that intercepts the
204+
/// broker's Fetch read path. When the requested offset has been flushed
205+
/// to the object store and the manifest knows the stream object key,
206+
/// the reader hands back the bytes from S3 instead of hitting the local
207+
/// WAL. Required for safe <c>WalFlusherOptions.TrimAfterFlush</c> usage.
208+
/// Null = direct LogManager read (pre-G21 behaviour).
209+
/// </summary>
210+
public DisaggregatedSegmentReader? DisaggregatedReader { get; init; }
211+
201212
// ==================== Internal ====================
202213

203214
/// <summary>

src/Kuestenlogik.Surgewave.Storage.Disaggregated/Wal/PartitionLogWalSegmentSource.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,32 @@ public ValueTask<IReadOnlyList<WalSealedSegment>> ListSealedAsync(
7070
var lastOffset = s.CurrentOffset - 1;
7171
if (lastOffset < baseOffset) continue; // empty segment, nothing to flush
7272

73+
var logPath = Path.Combine(dir, $"{baseOffset:D20}.log");
74+
var indexPath = Path.Combine(dir, $"{baseOffset:D20}.index");
75+
var timeIndexPath = Path.Combine(dir, $"{baseOffset:D20}.timeindex");
7376
sealedSegments.Add(new WalSealedSegment(
7477
Partition: partition,
7578
BaseOffset: baseOffset,
7679
LastOffset: lastOffset,
7780
SizeBytes: s.Size,
7881
CreatedAt: s.CreatedAt,
79-
ReadLogBytesAsync: ct => ReadFileOrEmptyAsync(Path.Combine(dir, $"{baseOffset:D20}.log"), ct),
80-
ReadIndexBytesAsync: ct => ReadFileOrEmptyAsync(Path.Combine(dir, $"{baseOffset:D20}.index"), ct),
81-
ReadTimeIndexBytesAsync: ct => ReadFileOrEmptyAsync(Path.Combine(dir, $"{baseOffset:D20}.timeindex"), ct)));
82+
ReadLogBytesAsync: ct => ReadFileOrEmptyAsync(logPath, ct),
83+
ReadIndexBytesAsync: ct => ReadFileOrEmptyAsync(indexPath, ct),
84+
ReadTimeIndexBytesAsync: ct => ReadFileOrEmptyAsync(timeIndexPath, ct))
85+
{
86+
TrimAsync = _ =>
87+
{
88+
// Only delete the .log; the index siblings live with it
89+
// and we want to keep the trim atomic-ish from the
90+
// observer's POV. Missing files are tolerated — File.Delete
91+
// is idempotent. Errors propagate to the flusher's
92+
// try/catch which logs + swallows.
93+
DeleteIfExists(logPath);
94+
DeleteIfExists(indexPath);
95+
DeleteIfExists(timeIndexPath);
96+
return Task.CompletedTask;
97+
},
98+
});
8299
}
83100

84101
return ValueTask.FromResult<IReadOnlyList<WalSealedSegment>>(sealedSegments);
@@ -93,4 +110,9 @@ private static async Task<byte[]> ReadFileOrEmptyAsync(string path, Cancellation
93110
if (!File.Exists(path)) return [];
94111
return await File.ReadAllBytesAsync(path, ct).ConfigureAwait(false);
95112
}
113+
114+
private static void DeleteIfExists(string path)
115+
{
116+
if (File.Exists(path)) File.Delete(path);
117+
}
96118
}

src/Kuestenlogik.Surgewave.Storage.Disaggregated/Wal/WalFlusher.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,24 @@ await _remote.UploadSegmentAsync(
142142
CreatedAt: segment.CreatedAt);
143143

144144
await _manifests.AppendObjectAsync(segment.Partition, manifestRef, cancellationToken).ConfigureAwait(false);
145+
146+
// Trim runs strictly AFTER the manifest commit succeeds. A failed
147+
// upload or commit above throws and we never reach here, so the
148+
// local segment file is preserved for the next scan to retry.
149+
// Trim itself is allowed to fail (logged + swallowed): the
150+
// manifest is the source of truth; a stray local file just costs
151+
// disk until the retention sweeper picks it up.
152+
if (_options.TrimAfterFlush && segment.TrimAsync is not null)
153+
{
154+
try
155+
{
156+
await segment.TrimAsync(cancellationToken).ConfigureAwait(false);
157+
}
158+
catch (Exception ex)
159+
{
160+
_logger.LogWarning(ex, "WAL-trim failed for {Topic}/{Partition} offset {Offset}; segment stays local",
161+
segment.Partition.Topic, segment.Partition.Partition, segment.BaseOffset);
162+
}
163+
}
145164
}
146165
}

src/Kuestenlogik.Surgewave.Storage.Disaggregated/Wal/WalFlusherOptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,15 @@ public sealed record WalFlusherOptions
2222
/// lot of catch-up to do. Default: 16.
2323
/// </summary>
2424
public int MaxSegmentsPerScan { get; init; } = 16;
25+
26+
/// <summary>
27+
/// Whether to delete the local segment files after a successful flush
28+
/// + manifest-commit. Off by default for safety: enabling this without
29+
/// also wiring the disaggregated Fetch fallback
30+
/// (<c>SurgewaveRuntimeBuilder.WithDisaggregatedReader</c>) would
31+
/// make trimmed offsets unreachable. Turn on once the read fallback
32+
/// is in place and you want the WAL footprint capped near zero
33+
/// instead of following <c>retention.ms</c>.
34+
/// </summary>
35+
public bool TrimAfterFlush { get; init; }
2536
}

src/Kuestenlogik.Surgewave.Storage.Disaggregated/Wal/WalSealedSegment.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,23 @@ public sealed record WalSealedSegment(
1717
DateTime CreatedAt,
1818
Func<CancellationToken, Task<byte[]>> ReadLogBytesAsync,
1919
Func<CancellationToken, Task<byte[]>> ReadIndexBytesAsync,
20-
Func<CancellationToken, Task<byte[]>> ReadTimeIndexBytesAsync);
20+
Func<CancellationToken, Task<byte[]>> ReadTimeIndexBytesAsync)
21+
{
22+
/// <summary>
23+
/// Optional delegate that deletes the local segment files after a
24+
/// successful flush + manifest-commit. When
25+
/// <see cref="WalFlusherOptions.TrimAfterFlush"/> is on and this
26+
/// delegate is set, the flusher invokes it as the final step of
27+
/// <c>FlushOneAsync</c>. Null = keep the local copy (default; the
28+
/// segment falls under the topic's normal <c>retention.ms</c> until
29+
/// it expires).
30+
///
31+
/// Safety note: the broker's Fetch path must consult the partition
32+
/// manifest via <c>DisaggregatedSegmentReader</c> before reading
33+
/// from the local log, otherwise a trimmed offset becomes
34+
/// unreachable. The runtime wires this together when both
35+
/// <c>WithPartitionAppender</c> and <c>WithDisaggregatedReader</c>
36+
/// are set.
37+
/// </summary>
38+
public Func<CancellationToken, Task>? TrimAsync { get; init; }
39+
}

tests/Kuestenlogik.Surgewave.Storage.Disaggregated.Tests/Wal/WalFlusherTests.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,66 @@ public void StreamObjectKeyConvention_uses_D20_padding()
116116
Assert.Equal("topics/orders/0/stream-00000000000000000042.so", key);
117117
}
118118

119+
[Fact]
120+
public async Task TrimAfterFlush_calls_TrimAsync_when_option_enabled()
121+
{
122+
var source = new FakeSource();
123+
var trimmed = false;
124+
source.Add(Segment(0, 99, 1024) with { TrimAsync = _ => { trimmed = true; return Task.CompletedTask; } });
125+
var sut = NewFlusher(source, out _, out _,
126+
options: new WalFlusherOptions { TrimAfterFlush = true });
127+
128+
await sut.RunOnceAsync(Partition);
129+
130+
Assert.True(trimmed);
131+
}
132+
133+
[Fact]
134+
public async Task TrimAfterFlush_off_skips_TrimAsync_even_when_delegate_set()
135+
{
136+
var source = new FakeSource();
137+
var trimmed = false;
138+
source.Add(Segment(0, 99, 1024) with { TrimAsync = _ => { trimmed = true; return Task.CompletedTask; } });
139+
var sut = NewFlusher(source, out _, out _,
140+
options: new WalFlusherOptions { TrimAfterFlush = false });
141+
142+
await sut.RunOnceAsync(Partition);
143+
144+
Assert.False(trimmed);
145+
}
146+
147+
[Fact]
148+
public async Task Failed_upload_skips_trim()
149+
{
150+
var source = new FakeSource();
151+
var trimmed = false;
152+
source.Add(Segment(0, 99, 1024) with { TrimAsync = _ => { trimmed = true; return Task.CompletedTask; } });
153+
var sut = new WalFlusher(source, new InMemoryPartitionManifestStore(), new FailingRemote(),
154+
options: new WalFlusherOptions { TrimAfterFlush = true });
155+
156+
await Assert.ThrowsAsync<IOException>(() => sut.RunOnceAsync(Partition));
157+
158+
Assert.False(trimmed);
159+
}
160+
161+
[Fact]
162+
public async Task Trim_exception_is_swallowed_after_successful_manifest_commit()
163+
{
164+
// A trim error after the manifest is already committed must not
165+
// surface — the manifest is the source of truth, the leftover
166+
// local file is a future janitor's problem.
167+
var source = new FakeSource();
168+
source.Add(Segment(0, 99, 1024) with { TrimAsync = _ => throw new IOException("disk full") });
169+
var sut = NewFlusher(source, out _, out var manifests,
170+
options: new WalFlusherOptions { TrimAfterFlush = true });
171+
172+
var n = await sut.RunOnceAsync(Partition);
173+
174+
Assert.Equal(1, n);
175+
var manifest = await manifests.GetAsync(Partition);
176+
Assert.Single(manifest.Objects);
177+
}
178+
119179
// ── fakes ────────────────────────────────────────────────────────────
120180

121181
private static WalFlusher NewFlusher(

0 commit comments

Comments
 (0)