Skip to content

Commit 88b64e6

Browse files
author
ivan.senyk
committed
feat: streaming snapshot flush with persistent writer, adaptive file splitting, and data loss fix
Streaming Flush Architecture: - Persistent IcebergWriter per table throughout incremental snapshot lifecycle - Chunked writes via SnapshotTableCompletionHandler SPI from debezium-core - Adaptive file splitting calibrated to actual data size and available memory - BatchCommitCoordinator for coordinated Iceberg commits across parallel workers Performance: - Throughput: ~14K -> ~80-120K rows/min - Peak memory: ~1.5GB -> ~200-300MB per worker - Parallel multi-table snapshot processing Critical Bug Fix: - processTablesInParallel() was catching exceptions without re-throwing, allowing Debezium to commit offsets despite failed writes, causing permanent silent data loss Schema Evolution Fixes: - Handle optional PK fields from CDC schema in applyFieldAddition - requireColumn for identifier fields widened to optional by unionByNameWith - Treat READ operations as direct INSERT in BaseDeltaTaskWriter - Evolve required fields to optional to prevent Parquet NPE on NULL values Additional: - Upgrade Debezium dependency from 3.3.1.Final to 3.6.0-SNAPSHOT - Support nested namespaces with dot separator for Iceberg catalog - Apply SMT chain to snapshot events for type consistency with CDC path - Consumer done flag for external report watcher lifecycle - OpenLineage output dataset emission integration - Quarkus management interface enabled at build time Depends-on: debezium/debezium#7362 (SPI classes for multi-threaded snapshot) Tested: PostgreSQL 16, 116 tables, ~128M rows, zero data loss
1 parent d5d8a8f commit 88b64e6

19 files changed

Lines changed: 1818 additions & 33 deletions

debezium-server-iceberg-dist/pom.xml

100644100755
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@
125125
<groupId>io.debezium</groupId>
126126
<artifactId>debezium-connector-oracle</artifactId>
127127
</dependency>
128+
<!-- DB2 connector not available in Debezium 3.6.0-SNAPSHOT
128129
<dependency>
129130
<groupId>io.debezium</groupId>
130131
<artifactId>debezium-connector-db2</artifactId>
131132
</dependency>
133+
-->
132134
<dependency>
133135
<groupId>io.debezium</groupId>
134136
<artifactId>debezium-scripting</artifactId>

debezium-server-iceberg-sink/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
<groupId>io.debezium</groupId>
2727
<artifactId>debezium-server-core</artifactId>
2828
</dependency>
29+
<dependency>
30+
<groupId>io.debezium</groupId>
31+
<artifactId>debezium-openlineage-api</artifactId>
32+
<version>${version.debezium}</version>
33+
</dependency>
2934
<dependency>
3035
<groupId>org.apache.kafka</groupId>
3136
<artifactId>kafka-clients</artifactId>
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
*
3+
* * Copyright memiiso Authors.
4+
* *
5+
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
*/
8+
9+
package io.debezium.server.iceberg;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ScheduledThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.locks.Lock;
17+
import java.util.concurrent.locks.ReentrantLock;
18+
19+
import org.apache.kafka.connect.source.SourceRecord;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
/**
24+
* Coordinates batching of snapshot table completions to optimize Iceberg file sizes.
25+
*
26+
* <p>Instead of committing each table individually (which creates many small files),
27+
* this coordinator accumulates tables until reaching the target batch size (~512MB)
28+
* and then commits them together in a single transaction.
29+
*
30+
* <p>This approach:
31+
* - Reduces S3/GCS PUT operations (cost optimization)
32+
* - Creates 512MB Parquet files (optimal for Trino queries)
33+
* - Reduces Iceberg manifest overhead
34+
* - Maintains low memory usage (batches instead of full buffering)
35+
*
36+
* @author Debezium Community
37+
*/
38+
public class BatchCommitCoordinator {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(BatchCommitCoordinator.class);
41+
42+
/**
43+
* Represents a completed table ready for batching.
44+
*/
45+
public static class CompletedTable {
46+
private final String tableName;
47+
private final List<SourceRecord> records;
48+
private final long estimatedBytes;
49+
50+
public CompletedTable(String tableName, List<SourceRecord> records) {
51+
this.tableName = tableName;
52+
this.records = records;
53+
// Estimate ~200 bytes per record (conservative)
54+
this.estimatedBytes = records.size() * 200L;
55+
}
56+
57+
public String getTableName() {
58+
return tableName;
59+
}
60+
61+
public List<SourceRecord> getRecords() {
62+
return records;
63+
}
64+
65+
public long getEstimatedBytes() {
66+
return estimatedBytes;
67+
}
68+
69+
public int getRecordCount() {
70+
return records.size();
71+
}
72+
}
73+
74+
private final IcebergChangeConsumer consumer;
75+
private final long targetBatchBytes;
76+
private final int maxTablesPerBatch;
77+
private final long timeoutMs;
78+
79+
private final List<CompletedTable> stagingBatch = new ArrayList<>();
80+
private final Lock batchLock = new ReentrantLock();
81+
private final ScheduledExecutorService timeoutExecutor;
82+
83+
private long lastFlushTime = System.currentTimeMillis();
84+
private long currentBatchBytes = 0;
85+
86+
/**
87+
* Creates a new batch commit coordinator.
88+
*
89+
* @param consumer The Iceberg consumer to flush batches to
90+
* @param targetBatchBytes Target batch size in bytes (e.g., 512MB = 536870912)
91+
* @param maxTablesPerBatch Maximum tables per batch (safety limit)
92+
* @param timeoutMs Maximum time to wait before flushing batch
93+
*/
94+
public BatchCommitCoordinator(
95+
IcebergChangeConsumer consumer,
96+
long targetBatchBytes,
97+
int maxTablesPerBatch,
98+
long timeoutMs) {
99+
100+
this.consumer = consumer;
101+
this.targetBatchBytes = targetBatchBytes;
102+
this.maxTablesPerBatch = maxTablesPerBatch;
103+
this.timeoutMs = timeoutMs;
104+
105+
// Timeout executor for periodic flush
106+
this.timeoutExecutor = new ScheduledThreadPoolExecutor(1, r -> {
107+
Thread t = new Thread(r, "batch-commit-timeout");
108+
t.setDaemon(true);
109+
return t;
110+
});
111+
112+
// Check for timeout every 5 seconds
113+
this.timeoutExecutor.scheduleAtFixedRate(
114+
this::checkTimeoutAndFlush,
115+
5000,
116+
5000,
117+
TimeUnit.MILLISECONDS
118+
);
119+
120+
LOGGER.info("BatchCommitCoordinator initialized: targetBytes={} MB, maxTables={}, timeout={}ms",
121+
targetBatchBytes / 1024 / 1024, maxTablesPerBatch, timeoutMs);
122+
}
123+
124+
/**
125+
* Called when a snapshot worker completes a table.
126+
*
127+
* @param tableName The completed table name
128+
* @param records Pre-transformed SourceRecords from the snapshot worker
129+
*/
130+
public void onTableCompleted(String tableName, List<SourceRecord> records) {
131+
if (records == null || records.isEmpty()) {
132+
LOGGER.debug("Skipping empty table '{}'", tableName);
133+
return;
134+
}
135+
136+
CompletedTable completedTable = new CompletedTable(tableName, records);
137+
138+
batchLock.lock();
139+
try {
140+
stagingBatch.add(completedTable);
141+
currentBatchBytes += completedTable.getEstimatedBytes();
142+
143+
LOGGER.info("[{}] Added table '{}' to staging batch ({} records, ~{} MB). Batch: {}/{} tables, ~{} MB",
144+
Thread.currentThread().getName(),
145+
tableName,
146+
completedTable.getRecordCount(),
147+
completedTable.getEstimatedBytes() / 1024 / 1024,
148+
stagingBatch.size(),
149+
maxTablesPerBatch,
150+
currentBatchBytes / 1024 / 1024);
151+
152+
// Check if batch is ready to commit
153+
boolean shouldFlush = currentBatchBytes >= targetBatchBytes ||
154+
stagingBatch.size() >= maxTablesPerBatch;
155+
156+
if (shouldFlush) {
157+
LOGGER.info("Batch ready: {} tables, ~{} MB (target: {} MB). Flushing now.",
158+
stagingBatch.size(),
159+
currentBatchBytes / 1024 / 1024,
160+
targetBatchBytes / 1024 / 1024);
161+
flushBatch();
162+
}
163+
}
164+
finally {
165+
batchLock.unlock();
166+
}
167+
}
168+
169+
/**
170+
* Periodic check for timeout-based flush.
171+
*/
172+
private void checkTimeoutAndFlush() {
173+
batchLock.lock();
174+
try {
175+
if (stagingBatch.isEmpty()) {
176+
return;
177+
}
178+
179+
long elapsed = System.currentTimeMillis() - lastFlushTime;
180+
if (elapsed >= timeoutMs) {
181+
LOGGER.info("Batch timeout reached ({} ms). Flushing {} tables, ~{} MB",
182+
elapsed,
183+
stagingBatch.size(),
184+
currentBatchBytes / 1024 / 1024);
185+
flushBatch();
186+
}
187+
}
188+
finally {
189+
batchLock.unlock();
190+
}
191+
}
192+
193+
/**
194+
* Flush the current batch to Iceberg (must hold lock).
195+
*/
196+
private void flushBatch() {
197+
if (stagingBatch.isEmpty()) {
198+
return;
199+
}
200+
201+
List<CompletedTable> batchToFlush = new ArrayList<>(stagingBatch);
202+
long batchBytes = currentBatchBytes;
203+
204+
// Clear staging for next batch
205+
stagingBatch.clear();
206+
currentBatchBytes = 0;
207+
lastFlushTime = System.currentTimeMillis();
208+
209+
// Release lock before expensive I/O
210+
batchLock.unlock();
211+
try {
212+
// Delegate to consumer
213+
consumer.flushSnapshotBatch(batchToFlush);
214+
215+
LOGGER.info("✅ Batch commit completed: {} tables, {} total rows, ~{} MB",
216+
batchToFlush.size(),
217+
batchToFlush.stream().mapToInt(CompletedTable::getRecordCount).sum(),
218+
batchBytes / 1024 / 1024);
219+
}
220+
catch (Exception e) {
221+
LOGGER.error("❌ Batch commit failed: {}", e.getMessage(), e);
222+
throw new RuntimeException("Batch commit failed", e);
223+
}
224+
finally {
225+
batchLock.lock();
226+
}
227+
}
228+
229+
/**
230+
* Flush any remaining batch (called during shutdown).
231+
*/
232+
public void shutdown() {
233+
LOGGER.info("Shutting down BatchCommitCoordinator. Flushing remaining batch...");
234+
235+
batchLock.lock();
236+
try {
237+
if (!stagingBatch.isEmpty()) {
238+
LOGGER.info("Flushing final batch: {} tables", stagingBatch.size());
239+
flushBatch();
240+
}
241+
}
242+
finally {
243+
batchLock.unlock();
244+
}
245+
246+
timeoutExecutor.shutdown();
247+
try {
248+
if (!timeoutExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
249+
timeoutExecutor.shutdownNow();
250+
}
251+
}
252+
catch (InterruptedException e) {
253+
timeoutExecutor.shutdownNow();
254+
Thread.currentThread().interrupt();
255+
}
256+
257+
LOGGER.info("BatchCommitCoordinator shutdown complete");
258+
}
259+
}

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/BatchConfig.java

100644100755
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,32 @@ public interface BatchConfig {
3737
@WithDefault("60")
3838
int concurrentUploadsTimeoutMinutes();
3939

40+
@WithName("debezium.sink.batch.buffer-per-table.enabled")
41+
@WithDefault("false")
42+
Boolean bufferPerTableEnabled();
43+
44+
@WithName("debezium.sink.batch.buffer-per-table.flush-interval-ms")
45+
@WithDefault("30000")
46+
Long bufferPerTableFlushIntervalMs();
47+
48+
@WithName("debezium.sink.batch.buffer-per-table.threshold")
49+
@WithDefault("1000")
50+
Integer bufferPerTableThreshold();
51+
52+
@WithName("debezium.sink.batch.buffer-per-table.max-tables")
53+
@WithDefault("100")
54+
Integer bufferPerTableMaxTables();
55+
56+
@WithName("debezium.sink.batch.buffer-per-table.retry.max-attempts")
57+
@WithDefault("3")
58+
Integer bufferPerTableRetryMaxAttempts();
59+
60+
@WithName("debezium.sink.batch.buffer-per-table.retry.initial-delay-ms")
61+
@WithDefault("1000")
62+
Long bufferPerTableRetryInitialDelayMs();
63+
64+
@WithName("debezium.sink.batch.buffer-per-table.retry.max-delay-ms")
65+
@WithDefault("60000")
66+
Long bufferPerTableRetryMaxDelayMs();
4067

4168
}

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ public interface DebeziumConfig {
8989
@WithName("debezium.transforms")
9090
Map<String, String> transformsConfigs();
9191

92+
@WithName("debezium.predicates")
93+
@WithDefault("")
94+
String predicates();
95+
96+
@WithName("debezium.predicates")
97+
Map<String, String> predicatesConfigs();
98+
99+
@WithName("debezium.source.topic.prefix")
100+
@WithDefault("cdc")
101+
String topicPrefix();
102+
92103
@WithName("debezium.source.topic.heartbeat.prefix")
93104
@WithDefault("__debezium-heartbeat")
94105
String topicHeartbeatPrefix();

0 commit comments

Comments
 (0)