2727import jakarta .enterprise .inject .Instance ;
2828import jakarta .inject .Inject ;
2929import jakarta .inject .Named ;
30- import org .apache .hadoop .conf .Configuration ;
31- import org .apache .iceberg .CatalogUtil ;
32- import org .apache .iceberg .PartitionSpec ;
33- import org .apache .iceberg .Schema ;
34- import org .apache .iceberg .SortOrder ;
35- import org .apache .iceberg .Table ;
36- import org .apache .iceberg .catalog .Catalog ;
37- import org .apache .iceberg .catalog .TableIdentifier ;
38- import org .slf4j .Logger ;
39- import org .slf4j .LoggerFactory ;
40-
4130import java .time .Duration ;
4231import java .time .Instant ;
4332import java .util .ArrayList ;
5342import java .util .concurrent .Semaphore ;
5443import java .util .concurrent .TimeUnit ;
5544import java .util .stream .Collectors ;
45+ import org .apache .hadoop .conf .Configuration ;
46+ import org .apache .iceberg .CatalogUtil ;
47+ import org .apache .iceberg .PartitionSpec ;
48+ import org .apache .iceberg .Schema ;
49+ import org .apache .iceberg .SortOrder ;
50+ import org .apache .iceberg .Table ;
51+ import org .apache .iceberg .catalog .Catalog ;
52+ import org .apache .iceberg .catalog .TableIdentifier ;
53+ import org .slf4j .Logger ;
54+ import org .slf4j .LoggerFactory ;
5655
5756/**
5857 * Implementation of the consumer that delivers the messages to iceberg tables.
6160 */
6261@ Named ("iceberg" )
6362@ Dependent
64- public class IcebergChangeConsumer implements DebeziumEngine .ChangeConsumer <EmbeddedEngineChangeEvent > {
63+ public class IcebergChangeConsumer
64+ implements DebeziumEngine .ChangeConsumer <EmbeddedEngineChangeEvent > {
6565
6666 protected static final Duration LOG_INTERVAL = Duration .ofMinutes (15 );
6767 private static final Logger LOGGER = LoggerFactory .getLogger (IcebergChangeConsumer .class );
@@ -72,19 +72,13 @@ public class IcebergChangeConsumer implements DebeziumEngine.ChangeConsumer<Embe
7272 protected Threads .Timer logTimer = Threads .timer (clock , LOG_INTERVAL );
7373 protected static String keyValueChangeEventFormat ;
7474
75- @ Inject
76- @ Any
77- Instance <BatchSizeWait > batchSizeWaitInstances ;
75+ @ Inject @ Any Instance <BatchSizeWait > batchSizeWaitInstances ;
7876 BatchSizeWait batchSizeWait ;
7977 Catalog icebergCatalog ;
80- @ Inject
81- IcebergTableOperator icebergTableOperator ;
82- @ Inject
83- GlobalConfig config ;
84-
85- @ Inject
86- @ Any
87- Instance <IcebergTableMapper > tableMappers ;
78+ @ Inject IcebergTableOperator icebergTableOperator ;
79+ @ Inject GlobalConfig config ;
80+
81+ @ Inject @ Any Instance <IcebergTableMapper > tableMappers ;
8882 IcebergTableMapper tableMapper ;
8983 private int numConcurrentUploads ;
9084 private int concurrentUploadsTimeoutMinutes ;
@@ -97,12 +91,17 @@ void connect() {
9791
9892 JsonEventConverter .initializeStaticSerdes ();
9993 keyValueChangeEventFormat = config .debezium ().keyValueChangeEventFormat ();
100- LOGGER .info ("IcebergChangeConsumer is configured to use the '{}' format for processing events." , keyValueChangeEventFormat );
94+ LOGGER .info (
95+ "IcebergChangeConsumer is configured to use the '{}' format for processing events." ,
96+ keyValueChangeEventFormat );
10197 // pass iceberg properties to iceberg and hadoop
10298 config .iceberg ().icebergConfigs ().forEach (this .hadoopConf ::set );
10399
104- icebergCatalog = CatalogUtil .buildIcebergCatalog (config .iceberg ().catalogName (), config .iceberg ().icebergConfigs (), hadoopConf );
105- batchSizeWait = IcebergUtil .selectInstance (batchSizeWaitInstances , config .batch ().batchSizeWaitName ());
100+ icebergCatalog =
101+ CatalogUtil .buildIcebergCatalog (
102+ config .iceberg ().catalogName (), config .iceberg ().icebergConfigs (), hadoopConf );
103+ batchSizeWait =
104+ IcebergUtil .selectInstance (batchSizeWaitInstances , config .batch ().batchSizeWaitName ());
106105 batchSizeWait .initizalize ();
107106 tableMapper = IcebergUtil .selectInstance (tableMappers , config .iceberg ().tableMapper ());
108107
@@ -116,7 +115,6 @@ void connect() {
116115 }
117116 }
118117
119-
120118 @ PreDestroy
121119 void close () {
122120 try {
@@ -134,21 +132,26 @@ void close() {
134132 }
135133
136134 @ Override
137- public void handleBatch (List <EmbeddedEngineChangeEvent > records ,
138- DebeziumEngine .RecordCommitter <EmbeddedEngineChangeEvent > committer )
135+ public void handleBatch (
136+ List <EmbeddedEngineChangeEvent > records ,
137+ DebeziumEngine .RecordCommitter <EmbeddedEngineChangeEvent > committer )
139138 throws InterruptedException {
140139 Instant start = Instant .now ();
141140
142141 // group events by destination (per iceberg table)
143- Map <String , List <EventConverter >> result = records .stream ()
144- .map ((EmbeddedEngineChangeEvent e ) -> {
145- return switch (keyValueChangeEventFormat ) {
146- case "json" -> new JsonEventConverter (e , config );
147- case "connect" -> new StructEventConverter (e , config );
148- default -> throw new DebeziumException ("Unsupported format:" + keyValueChangeEventFormat );
149- };
150- })
151- .collect (Collectors .groupingBy (EventConverter ::destination ));
142+ Map <String , List <EventConverter >> result =
143+ records .stream ()
144+ .map (
145+ (EmbeddedEngineChangeEvent e ) -> {
146+ return switch (keyValueChangeEventFormat ) {
147+ case "json" -> new JsonEventConverter (e , config );
148+ case "connect" -> new StructEventConverter (e , config );
149+ default ->
150+ throw new DebeziumException (
151+ "Unsupported format:" + keyValueChangeEventFormat );
152+ };
153+ })
154+ .collect (Collectors .groupingBy (EventConverter ::destination ));
152155
153156 // consume list of events for each destination table
154157 if (numConcurrentUploads > 1 ) {
@@ -174,84 +177,127 @@ public void handleBatch(List<EmbeddedEngineChangeEvent> records,
174177 /**
175178 * Processes events for each destination table sequentially in a single thread.
176179 *
177- * @param eventsByDestination A map where keys are destination table names and
178- * values are lists of events for that table.
180+ * @param eventsByDestination A map where keys are destination table names and values are lists of
181+ * events for that table.
179182 */
180183 private void processTablesSequentially (Map <String , List <EventConverter >> eventsByDestination ) {
181184 for (Map .Entry <String , List <EventConverter >> tableEvents : eventsByDestination .entrySet ()) {
182185
183- if (config .debezium ().isHeartbeatTopic (tableEvents .getKey ()) && config .debezium ().topicHeartbeatSkipConsuming ()) {
186+ if (config .debezium ().isHeartbeatTopic (tableEvents .getKey ())
187+ && config .debezium ().topicHeartbeatSkipConsuming ()) {
184188 continue ;
185189 }
186190
187- Table icebergTable = this .loadIcebergTable (mapDestination (tableEvents .getKey ()), tableEvents .getValue ().get (0 ));
191+ Table icebergTable =
192+ this .loadIcebergTable (
193+ mapDestination (tableEvents .getKey ()), tableEvents .getValue ().get (0 ));
188194 icebergTableOperator .addToTable (icebergTable , tableEvents .getValue ());
189195 }
190196 }
191197
192198 // In IcebergChangeConsumer.java
193199
194200 /**
195- * Processes events for each destination table in parallel using a virtual
196- * thread pool.
201+ * Processes events for each destination table in parallel using a virtual thread pool.
197202 *
198- * @param eventsByDestination A map where keys are destination table names and
199- * values are lists of events for that table.
203+ * @param eventsByDestination A map where keys are destination table names and values are lists of
204+ * events for that table.
200205 */
201206 private void processTablesInParallel (Map <String , List <EventConverter >> eventsByDestination ) {
202207 List <Callable <Void >> tasks = new ArrayList <>();
203208 for (Map .Entry <String , List <EventConverter >> tableEvents : eventsByDestination .entrySet ()) {
204209
205- if (config .debezium ().isHeartbeatTopic (tableEvents .getKey ()) && config .debezium ().topicHeartbeatSkipConsuming ()) {
210+ if (config .debezium ().isHeartbeatTopic (tableEvents .getKey ())
211+ && config .debezium ().topicHeartbeatSkipConsuming ()) {
206212 continue ;
207213 }
208214
209- tasks .add (() -> {
210- try {
211- // Acquire a permit from the Semaphore to enforce the concurrency limit.
212- LOGGER .trace ("Task for destination '{}' waiting for permit. Available: {}" , tableEvents .getKey (),
213- concurrencyLimiter .availablePermits ());
214- concurrencyLimiter .acquire ();
215- LOGGER .debug ("Task for destination '{}' acquired permit. Starting processing." , tableEvents .getKey ());
216-
217- Table icebergTable = this .loadIcebergTable (mapDestination (tableEvents .getKey ()),
218- tableEvents .getValue ().get (0 ));
219- icebergTableOperator .addToTable (icebergTable , tableEvents .getValue ());
220- return null ; // Callable must return a value
221- } catch (InterruptedException e ) {
222- Thread .currentThread ().interrupt (); // Restore interrupted status
223- LOGGER .warn ("Task for destination '{}' was interrupted." , tableEvents .getKey (), e );
224- return null ;
225- } catch (Exception e ) {
226- // Log and rethrow. This will be wrapped in an ExecutionException by the Future.
227- LOGGER .error ("Task for destination '{}' failed." , tableEvents .getKey (), e );
228- throw e ;
229- } finally {
230- // Always release the permit, even if an exception occurred.
231- concurrencyLimiter .release ();
232- LOGGER .trace ("Task for destination '{}' released permit. Available: {}" , tableEvents .getKey (),
233- concurrencyLimiter .availablePermits ());
234- }
235- });
215+ tasks .add (
216+ () -> {
217+ try {
218+ // Acquire a permit from the Semaphore to enforce the concurrency limit.
219+ LOGGER .trace (
220+ "Task for destination '{}' waiting for permit. Available: {}" ,
221+ tableEvents .getKey (),
222+ concurrencyLimiter .availablePermits ());
223+ concurrencyLimiter .acquire ();
224+ LOGGER .debug (
225+ "Task for destination '{}' acquired permit. Starting processing." ,
226+ tableEvents .getKey ());
227+
228+ Table icebergTable =
229+ this .loadIcebergTable (
230+ mapDestination (tableEvents .getKey ()), tableEvents .getValue ().get (0 ));
231+ icebergTableOperator .addToTable (icebergTable , tableEvents .getValue ());
232+ return null ; // Callable must return a value
233+ } catch (InterruptedException e ) {
234+ Thread .currentThread ().interrupt (); // Restore interrupted status
235+ LOGGER .warn ("Task for destination '{}' was interrupted." , tableEvents .getKey (), e );
236+ return null ;
237+ } catch (Exception e ) {
238+ // Log and rethrow. This will be wrapped in an ExecutionException by the Future.
239+ LOGGER .error ("Task for destination '{}' failed." , tableEvents .getKey (), e );
240+ throw e ;
241+ } finally {
242+ // Always release the permit, even if an exception occurred.
243+ concurrencyLimiter .release ();
244+ LOGGER .trace (
245+ "Task for destination '{}' released permit. Available: {}" ,
246+ tableEvents .getKey (),
247+ concurrencyLimiter .availablePermits ());
248+ }
249+ });
236250 }
237251
238252 LOGGER .debug ("Invoking {} parallel tasks and waiting for completion..." , tasks .size ());
239253 try {
240254 // Invoke all tasks and wait for them to complete, with a timeout.
241- List <Future <Void >> futures = executor .invokeAll (tasks , concurrentUploadsTimeoutMinutes , TimeUnit .MINUTES );
242-
243- // Check the status of each task to log any exceptions
255+ List <Future <Void >> futures =
256+ executor .invokeAll (tasks , concurrentUploadsTimeoutMinutes , TimeUnit .MINUTES );
257+
258+ // Track failures and re-throw to prevent silent data loss.
259+ // Without this, a failed parallel task is only logged: the loop exits normally,
260+ // Debezium considers the batch successful, and commits the source offset.
261+ // The data for the failed table is permanently lost.
262+ boolean hasFailures = false ;
263+ DebeziumException firstException = null ;
264+ int failureCount = 0 ;
265+
266+ // Check the status of each task
244267 for (Future <Void > future : futures ) {
245268 try {
246269 // future.get() will throw an exception if the task failed or was cancelled.
247270 future .get ();
248271 } catch (CancellationException e ) {
272+ hasFailures = true ;
273+ failureCount ++;
249274 LOGGER .error ("A task was cancelled, likely due to timeout." , e );
275+ if (firstException == null ) {
276+ firstException = new DebeziumException ("Task cancelled due to timeout" , e );
277+ }
250278 } catch (ExecutionException e ) {
279+ hasFailures = true ;
280+ failureCount ++;
251281 // The original exception from the Callable is wrapped in ExecutionException
252- LOGGER .error ("A task failed with an exception: {}" , e .getCause ().getMessage (), e .getCause ());
282+ LOGGER .error (
283+ "A task failed with an exception: {}" , e .getCause ().getMessage (), e .getCause ());
284+ if (firstException == null ) {
285+ firstException =
286+ new DebeziumException ("Parallel table processing failed" , e .getCause ());
287+ }
253288 }
254289 }
290+
291+ // Re-throw if any task failed: prevents Debezium from committing the source offset
292+ // when at least one table write did not succeed. The whole batch is retried, which
293+ // is the correct behavior since offsets are committed at batch granularity.
294+ if (hasFailures ) {
295+ LOGGER .error (
296+ "CRITICAL: {} out of {} parallel tasks failed. Aborting batch to prevent data loss." ,
297+ failureCount ,
298+ tasks .size ());
299+ throw firstException ;
300+ }
255301 LOGGER .debug ("All parallel tasks have been processed." );
256302
257303 } catch (InterruptedException e ) {
@@ -261,11 +307,11 @@ private void processTablesInParallel(Map<String, List<EventConverter>> eventsByD
261307 }
262308
263309 /**
264- * @param tableId iceberg table identifier
265- * @param sampleEvent sample debezium event. event schema used to create iceberg
266- * table when table not found
267- * @return iceberg table, throws RuntimeException when table not found, and it's
268- * not possible to create it
310+ * @param tableId iceberg table identifier
311+ * @param sampleEvent sample debezium event. event schema used to create iceberg table when table
312+ * not found
313+ * @return iceberg table, throws RuntimeException when table not found, and it's not possible to
314+ * create it
269315 */
270316 public Table loadIcebergTable (TableIdentifier tableId , EventConverter sampleEvent ) {
271317 return IcebergUtil .loadIcebergTable (icebergCatalog , tableId )
@@ -275,14 +321,18 @@ public Table loadIcebergTable(TableIdentifier tableId, EventConverter sampleEven
275321 private Table createIcebergTable (TableIdentifier tableId , EventConverter sampleEvent ) {
276322 if (!config .debezium ().eventSchemaEnabled ()
277323 && !Objects .equals (config .debezium ().keyValueChangeEventFormat (), "connect" )) {
278- throw new RuntimeException ("Table '" + tableId + "' not found! "
279- + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!" );
324+ throw new RuntimeException (
325+ "Table '"
326+ + tableId
327+ + "' not found! "
328+ + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!" );
280329 }
281330 try {
282331 final Schema schema ;
283332 final SortOrder sortOrder ;
284333 if (!config .iceberg ().createIdentifierFields ()) {
285- LOGGER .warn ("Creating identifier fields is disabled, creating schema without identifier fields!" );
334+ LOGGER .warn (
335+ "Creating identifier fields is disabled, creating schema without identifier fields!" );
286336 schema = sampleEvent .icebergSchema (false );
287337 sortOrder = SortOrder .unsorted ();
288338 } else if (config .iceberg ().nestedAsVariant ()) {
@@ -296,7 +346,8 @@ private Table createIcebergTable(TableIdentifier tableId, EventConverter sampleE
296346 // and "tableChanges" fields.
297347 // "schema change topic"
298348 // https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
299- LOGGER .warn ("Creating schema change topic/table without identifier fields for append-only mode." );
349+ LOGGER .warn (
350+ "Creating schema change topic/table without identifier fields for append-only mode." );
300351 schema = sampleEvent .icebergSchema (false );
301352 sortOrder = SortOrder .unsorted ();
302353 } else if (config .debezium ().isHeartbeatTopic (sampleEvent .destination ())) {
@@ -307,18 +358,29 @@ private Table createIcebergTable(TableIdentifier tableId, EventConverter sampleE
307358 sortOrder = sampleEvent .sortOrder (schema );
308359 }
309360
310- final List <String > partitionByOptions = config .iceberg ().partitionByForTable (sampleEvent .destination ());
361+ final List <String > partitionByOptions =
362+ config .iceberg ().partitionByForTable (sampleEvent .destination ());
311363 PartitionSpec spec = IcebergUtil .createPartitionSpec (schema , partitionByOptions );
312364
313365 // for backward compatibility, to be removed and set to "3" with one of the next
314366 // releases
315367 // Format 3 will be used when variant data type is used
316368 final String tableFormatVersion = config .iceberg ().nestedAsVariant () ? "3" : "2" ;
317- return IcebergUtil .createIcebergTable (icebergCatalog , tableId , schema , spec , sortOrder ,
318- config .iceberg ().writeFormat (), tableFormatVersion );
369+ return IcebergUtil .createIcebergTable (
370+ icebergCatalog ,
371+ tableId ,
372+ schema ,
373+ spec ,
374+ sortOrder ,
375+ config .iceberg ().writeFormat (),
376+ tableFormatVersion );
319377 } catch (Exception e ) {
320378 throw new DebeziumException (
321- "Failed to create table from debezium event table:" + tableId + " Error:" + e .getMessage (), e );
379+ "Failed to create table from debezium event table:"
380+ + tableId
381+ + " Error:"
382+ + e .getMessage (),
383+ e );
322384 }
323385 }
324386
@@ -330,7 +392,9 @@ private Table createIcebergTable(TableIdentifier tableId, EventConverter sampleE
330392 protected void logConsumerProgress (long numUploadedEvents ) {
331393 numConsumedEvents += numUploadedEvents ;
332394 if (logTimer .expired ()) {
333- LOGGER .info ("Consumed {} records after {}" , numConsumedEvents ,
395+ LOGGER .info (
396+ "Consumed {} records after {}" ,
397+ numConsumedEvents ,
334398 Strings .duration (clock .currentTimeInMillis () - consumerStart ));
335399 numConsumedEvents = 0 ;
336400 consumerStart = clock .currentTimeInMillis ();
0 commit comments