Skip to content

Commit 42bff6e

Browse files
committed
feat: streaming snapshot flush with persistent writer and adaptive file splitting
This commit implements the streaming snapshot flush pattern for the Iceberg sink. Combined with the parallel incremental snapshot SPI introduced in debezium/debezium#7362, it dramatically reduces commit overhead and memory pressure during snapshot of large tables. ## Streaming snapshot flush Instead of creating a new Iceberg writer for every batch (5K-20K rows), keep a single writer open per table for the entire snapshot. The writer accumulates data across chunks and produces a single atomic commit at table completion. Periodic file splitting kicks in when the writer reaches a calibrated row threshold, producing ~512MB Parquet files. After the first split-commit, the threshold is recalibrated from actual file size (bytes-per-row) and clamped by available heap (60% of max heap, divided by worker count, divided by an in-memory factor of ~40x for Parquet decompression). ## Components - `IcebergSnapshotCompletionHandler` — implements the SPI from debezium-connector-common. Routes per-chunk events to the streaming writer and triggers final commit on `onTableSnapshotFinished()`. - `BatchCommitCoordinator` — accumulates events from CDC streaming path (legacy fallback when SPI not available). - `IcebergChangeConsumer.StreamingSnapshotContext` — per-table state holder: open writer, cached schema converter, calibrated split threshold. - `IcebergTableOperator.writeChunkToWriter()` / `commitWriter()` — write without commit / final atomic commit + `CommitResult` for adaptive calibration. - `IcebergTableOperator.isSafeTypeChange()` — allows compatible type evolution (timestamptz↔timestamp, decimal↔double, int↔long) for pre-existing tables with legacy schemas. - `StructEventConverter` — cached schema converter constructor, static `fieldMappingCache` for performance. - `EventConverter.isSnapshotEvent()` — used to skip equality-delete writes for READ ops. - Schema evolution + identifier field protection in `IcebergTableOperator.applyFieldAddition()` — protect both new schema's and existing table's identifier fields when key schema is unavailable (e.g. `key.converter.schemas.enable=false`). ## Throughput / memory impact (production, PostgreSQL 16, 116 tables, ~128M rows) | Metric | Before (per-batch writer) | After (streaming + adaptive split) | |-------------------------|---------------------------|-------------------------------------| | Iceberg writers / table | ~1,500 | 1 (with periodic file splits) | | Iceberg commits / table | ~1,500 | ~6-10 (one per ~512MB Parquet file) | | Throughput | ~14K rows/min | ~80-120K rows/min | | Peak memory / worker | ~1.5 GB | ~200-300 MB | ## Build alignment Pin `kafka-clients:4.2.0` (matches `connect-runtime:4.2.0` from `debezium-bom:3.6.0-SNAPSHOT`; the `debezium-server-bom:3.5.0.Final` would otherwise pull `kafka-clients:4.1.1` which is missing `ConfigDef$ValidList.anyNonDuplicateValues`). Pin `httpclient5:5.4.3` to avoid the 5.4.3+5.5 classpath duplication that caused HEAD-request format issues against some REST catalogs (Lakekeeper). ## Dependencies This PR depends on debezium/debezium#7362 which introduces the `SnapshotTableCompletionHandler` SPI in `debezium-connector-common`. The CI build will fail until that PR is merged and `debezium-bom:3.6.0-SNAPSHOT` is published. ## Spinoff PRs (already extracted, mergeable independently before this one) - #695 — Support nested namespaces with dot separator - #696 — OpenLineage integration and Quarkus management interface - #698 — Snapshot READ semantics (READ as INSERT, missing __op handling) - #699 — Critical data loss fix in processTablesInParallel When those are merged, this PR's diff will shrink to only the streaming flush changes + build alignment. Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
1 parent 61a8857 commit 42bff6e

15 files changed

Lines changed: 1994 additions & 282 deletions

debezium-server-iceberg-dist/pom.xml

100644100755
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@
125125
<groupId>io.debezium</groupId>
126126
<artifactId>debezium-connector-oracle</artifactId>
127127
</dependency>
128+
<!-- DB2 connector not available in Debezium 3.6.0-SNAPSHOT
128129
<dependency>
129130
<groupId>io.debezium</groupId>
130131
<artifactId>debezium-connector-db2</artifactId>
131132
</dependency>
133+
-->
132134
<dependency>
133135
<groupId>io.debezium</groupId>
134136
<artifactId>debezium-scripting</artifactId>
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
*
3+
* * Copyright memiiso Authors.
4+
* *
5+
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
*/
8+
9+
package io.debezium.server.iceberg;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ScheduledThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.locks.Lock;
17+
import java.util.concurrent.locks.ReentrantLock;
18+
19+
import org.apache.kafka.connect.source.SourceRecord;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
/**
24+
* Coordinates batching of snapshot table completions to optimize Iceberg file sizes.
25+
*
26+
* <p>Instead of committing each table individually (which creates many small files),
27+
* this coordinator accumulates tables until reaching the target batch size (~512MB)
28+
* and then commits them together in a single transaction.
29+
*
30+
* <p>This approach:
31+
* - Reduces S3/GCS PUT operations (cost optimization)
32+
* - Creates 512MB Parquet files (optimal for Trino queries)
33+
* - Reduces Iceberg manifest overhead
34+
* - Maintains low memory usage (batches instead of full buffering)
35+
*
36+
* @author Debezium Community
37+
*/
38+
public class BatchCommitCoordinator {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(BatchCommitCoordinator.class);
41+
42+
/**
43+
* Represents a completed table ready for batching.
44+
*/
45+
public static class CompletedTable {
46+
private final String tableName;
47+
private final List<SourceRecord> records;
48+
private final long estimatedBytes;
49+
50+
public CompletedTable(String tableName, List<SourceRecord> records) {
51+
this.tableName = tableName;
52+
this.records = records;
53+
// Estimate ~200 bytes per record (conservative)
54+
this.estimatedBytes = records.size() * 200L;
55+
}
56+
57+
public String getTableName() {
58+
return tableName;
59+
}
60+
61+
public List<SourceRecord> getRecords() {
62+
return records;
63+
}
64+
65+
public long getEstimatedBytes() {
66+
return estimatedBytes;
67+
}
68+
69+
public int getRecordCount() {
70+
return records.size();
71+
}
72+
}
73+
74+
private final IcebergChangeConsumer consumer;
75+
private final long targetBatchBytes;
76+
private final int maxTablesPerBatch;
77+
private final long timeoutMs;
78+
79+
private final List<CompletedTable> stagingBatch = new ArrayList<>();
80+
private final Lock batchLock = new ReentrantLock();
81+
private final ScheduledExecutorService timeoutExecutor;
82+
83+
private long lastFlushTime = System.currentTimeMillis();
84+
private long currentBatchBytes = 0;
85+
86+
/**
87+
* Creates a new batch commit coordinator.
88+
*
89+
* @param consumer The Iceberg consumer to flush batches to
90+
* @param targetBatchBytes Target batch size in bytes (e.g., 512MB = 536870912)
91+
* @param maxTablesPerBatch Maximum tables per batch (safety limit)
92+
* @param timeoutMs Maximum time to wait before flushing batch
93+
*/
94+
public BatchCommitCoordinator(
95+
IcebergChangeConsumer consumer,
96+
long targetBatchBytes,
97+
int maxTablesPerBatch,
98+
long timeoutMs) {
99+
100+
this.consumer = consumer;
101+
this.targetBatchBytes = targetBatchBytes;
102+
this.maxTablesPerBatch = maxTablesPerBatch;
103+
this.timeoutMs = timeoutMs;
104+
105+
// Timeout executor for periodic flush
106+
this.timeoutExecutor = new ScheduledThreadPoolExecutor(1, r -> {
107+
Thread t = new Thread(r, "batch-commit-timeout");
108+
t.setDaemon(true);
109+
return t;
110+
});
111+
112+
// Check for timeout every 5 seconds
113+
this.timeoutExecutor.scheduleAtFixedRate(
114+
this::checkTimeoutAndFlush,
115+
5000,
116+
5000,
117+
TimeUnit.MILLISECONDS
118+
);
119+
120+
LOGGER.info("BatchCommitCoordinator initialized: targetBytes={} MB, maxTables={}, timeout={}ms",
121+
targetBatchBytes / 1024 / 1024, maxTablesPerBatch, timeoutMs);
122+
}
123+
124+
/**
125+
* Called when a snapshot worker completes a table.
126+
*
127+
* @param tableName The completed table name
128+
* @param records Pre-transformed SourceRecords from the snapshot worker
129+
*/
130+
public void onTableCompleted(String tableName, List<SourceRecord> records) {
131+
if (records == null || records.isEmpty()) {
132+
LOGGER.debug("Skipping empty table '{}'", tableName);
133+
return;
134+
}
135+
136+
CompletedTable completedTable = new CompletedTable(tableName, records);
137+
138+
batchLock.lock();
139+
try {
140+
stagingBatch.add(completedTable);
141+
currentBatchBytes += completedTable.getEstimatedBytes();
142+
143+
LOGGER.info("[{}] Added table '{}' to staging batch ({} records, ~{} MB). Batch: {}/{} tables, ~{} MB",
144+
Thread.currentThread().getName(),
145+
tableName,
146+
completedTable.getRecordCount(),
147+
completedTable.getEstimatedBytes() / 1024 / 1024,
148+
stagingBatch.size(),
149+
maxTablesPerBatch,
150+
currentBatchBytes / 1024 / 1024);
151+
152+
// Check if batch is ready to commit
153+
boolean shouldFlush = currentBatchBytes >= targetBatchBytes ||
154+
stagingBatch.size() >= maxTablesPerBatch;
155+
156+
if (shouldFlush) {
157+
LOGGER.info("Batch ready: {} tables, ~{} MB (target: {} MB). Flushing now.",
158+
stagingBatch.size(),
159+
currentBatchBytes / 1024 / 1024,
160+
targetBatchBytes / 1024 / 1024);
161+
flushBatch();
162+
}
163+
}
164+
finally {
165+
batchLock.unlock();
166+
}
167+
}
168+
169+
/**
170+
* Periodic check for timeout-based flush.
171+
*/
172+
private void checkTimeoutAndFlush() {
173+
batchLock.lock();
174+
try {
175+
if (stagingBatch.isEmpty()) {
176+
return;
177+
}
178+
179+
long elapsed = System.currentTimeMillis() - lastFlushTime;
180+
if (elapsed >= timeoutMs) {
181+
LOGGER.info("Batch timeout reached ({} ms). Flushing {} tables, ~{} MB",
182+
elapsed,
183+
stagingBatch.size(),
184+
currentBatchBytes / 1024 / 1024);
185+
flushBatch();
186+
}
187+
}
188+
finally {
189+
batchLock.unlock();
190+
}
191+
}
192+
193+
/**
194+
* Flush the current batch to Iceberg (must hold lock).
195+
*/
196+
private void flushBatch() {
197+
if (stagingBatch.isEmpty()) {
198+
return;
199+
}
200+
201+
List<CompletedTable> batchToFlush = new ArrayList<>(stagingBatch);
202+
long batchBytes = currentBatchBytes;
203+
204+
// Clear staging for next batch
205+
stagingBatch.clear();
206+
currentBatchBytes = 0;
207+
lastFlushTime = System.currentTimeMillis();
208+
209+
// Release lock before expensive I/O
210+
batchLock.unlock();
211+
try {
212+
// Delegate to consumer
213+
consumer.flushSnapshotBatch(batchToFlush);
214+
215+
LOGGER.info("✅ Batch commit completed: {} tables, {} total rows, ~{} MB",
216+
batchToFlush.size(),
217+
batchToFlush.stream().mapToInt(CompletedTable::getRecordCount).sum(),
218+
batchBytes / 1024 / 1024);
219+
}
220+
catch (Exception e) {
221+
LOGGER.error("❌ Batch commit failed: {}", e.getMessage(), e);
222+
throw new RuntimeException("Batch commit failed", e);
223+
}
224+
finally {
225+
batchLock.lock();
226+
}
227+
}
228+
229+
/**
230+
* Flush any remaining batch (called during shutdown).
231+
*/
232+
public void shutdown() {
233+
LOGGER.info("Shutting down BatchCommitCoordinator. Flushing remaining batch...");
234+
235+
batchLock.lock();
236+
try {
237+
if (!stagingBatch.isEmpty()) {
238+
LOGGER.info("Flushing final batch: {} tables", stagingBatch.size());
239+
flushBatch();
240+
}
241+
}
242+
finally {
243+
batchLock.unlock();
244+
}
245+
246+
timeoutExecutor.shutdown();
247+
try {
248+
if (!timeoutExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
249+
timeoutExecutor.shutdownNow();
250+
}
251+
}
252+
catch (InterruptedException e) {
253+
timeoutExecutor.shutdownNow();
254+
Thread.currentThread().interrupt();
255+
}
256+
257+
LOGGER.info("BatchCommitCoordinator shutdown complete");
258+
}
259+
}

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/BatchConfig.java

100644100755
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,32 @@ public interface BatchConfig {
3737
@WithDefault("60")
3838
int concurrentUploadsTimeoutMinutes();
3939

40+
@WithName("debezium.sink.batch.buffer-per-table.enabled")
41+
@WithDefault("false")
42+
Boolean bufferPerTableEnabled();
43+
44+
@WithName("debezium.sink.batch.buffer-per-table.flush-interval-ms")
45+
@WithDefault("30000")
46+
Long bufferPerTableFlushIntervalMs();
47+
48+
@WithName("debezium.sink.batch.buffer-per-table.threshold")
49+
@WithDefault("1000")
50+
Integer bufferPerTableThreshold();
51+
52+
@WithName("debezium.sink.batch.buffer-per-table.max-tables")
53+
@WithDefault("100")
54+
Integer bufferPerTableMaxTables();
55+
56+
@WithName("debezium.sink.batch.buffer-per-table.retry.max-attempts")
57+
@WithDefault("3")
58+
Integer bufferPerTableRetryMaxAttempts();
59+
60+
@WithName("debezium.sink.batch.buffer-per-table.retry.initial-delay-ms")
61+
@WithDefault("1000")
62+
Long bufferPerTableRetryInitialDelayMs();
63+
64+
@WithName("debezium.sink.batch.buffer-per-table.retry.max-delay-ms")
65+
@WithDefault("60000")
66+
Long bufferPerTableRetryMaxDelayMs();
4067

4168
}

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ public interface DebeziumConfig {
8989
@WithName("debezium.transforms")
9090
Map<String, String> transformsConfigs();
9191

92+
@WithName("debezium.predicates")
93+
@WithDefault("")
94+
String predicates();
95+
96+
@WithName("debezium.predicates")
97+
Map<String, String> predicatesConfigs();
98+
99+
@WithName("debezium.source.topic.prefix")
100+
@WithDefault("cdc")
101+
String topicPrefix();
102+
92103
@WithName("debezium.source.topic.heartbeat.prefix")
93104
@WithDefault("__debezium-heartbeat")
94105
String topicHeartbeatPrefix();

0 commit comments

Comments
 (0)