Skip to content

debezium/dbz#1829 Add parallel incremental snapshot for relational connectors#7362

Open
MrIvv wants to merge 2 commits into
debezium:mainfrom
MrIvv:debezium-evolution-mt
Open

debezium/dbz#1829 Add parallel incremental snapshot for relational connectors#7362
MrIvv wants to merge 2 commits into
debezium:mainfrom
MrIvv:debezium-evolution-mt

Conversation

@MrIvv

@MrIvv MrIvv commented Apr 16, 2026

Copy link
Copy Markdown

Fixes

debezium/dbz#1829

Summary

This PR adds table-level parallelism to incremental snapshots. Multiple data collections in a signal are scanned concurrently within a single DBLog watermark window, bounded by snapshot.max.threads. Per-chunk deduplication semantics are preserved.

Supporting changes: a retry policy with exponential backoff for transient JDBC failures during chunk reads, and a work-queue scheduler that keeps workers busy across the whole signal.

Motivation

Today's incremental snapshot scans tables strictly sequentially. On any reasonable hardware (multi-core, bonded NICs, fast storage) the bottleneck is one slow table at a time, not the total work — leaving CPU and JDBC bandwidth idle while the connector waits.

This PR removes that bottleneck without changing the semantics of the snapshot itself.

Changes

1. Parallel table scan during incremental snapshot

AbstractIncrementalSnapshotChangeEventSource runs each round (one DBLog watermark window) across snapshot.max.threads worker threads, each holding its own TableSnapshotContext and JDBC connection. The window is opened once for the whole round and closed once at the end — workers do not contend on the watermark, they only contend on the per-table chunk position.

SignalBasedIncrementalSnapshotChangeEventSource and PostgresReadOnlyIncrementalSnapshotChangeEventSource are wired to use the parallel coordinator when snapshot.max.threads > 1 and degrade to the existing sequential path when = 1 (default).

2. Work-queue scheduling (no batch-and-wait idle)

When the signal carries N tables and the pool has K workers, all N tables go into a ConcurrentLinkedQueue. Each worker picks the next table immediately upon completing its current one — no awaitCompletion() between sub-batches.

Aspect Sequential / batch-and-wait This PR
Worker idle between tables Yes, worker waits for slowest sibling No, picks next from shared queue
JDBC connection Acquired per table Held by worker across all its tables
awaitCompletion() calls One per sub-batch One total at end of round
Tasks submitted N per sub-batch Exactly workerCount total

3. New helper: ParallelIncrementalSnapshotCoordinator

Owns the JDBC connection pool, the persistent worker executor, and the per-table window buffers. Pool lifecycle is on-demand:

  • Opens lazily on the first signal arrival, with snapshot.max.threads JDBC connections.
  • Reused across back-to-back signals (no per-signal pool churn).
  • Released after a configurable grace period (incremental.snapshot.pool.release.delay.ms, default 60_000) once no signal is in flight. A 0 value releases immediately.
  • Race-free release: an AtomicLong generation counter aborts a scheduled close if a new signal arrives mid-grace.

Pooled connections are validated on borrow via JdbcConnection.isValid() and lazily reallocated on failure. CopyOnWriteArrayList tracks live connections so concurrent eviction stays safe. The worker executor is a single Threads.newFixedThreadPool for the whole snapshot lifetime; it is shut down only when the coordinator itself terminates.

Connection threading is end-to-end: ChunkQueryBuilder.readTableChunkStatement now takes the worker's JdbcConnection as a parameter, so workers do not contend on a shared field or perform connection-swap dances mid-chunk.

The coordinator is annotated @NotThreadSafe for state changes and uses a ReentrantLock + AtomicReference pattern (consistent with BaseSourceTask) for the public lifecycle methods (ensurePoolOpen, scheduleReleaseIfNotPending, shutdown).

4. Retry policy with exponential backoff: RetryExecutor

New utility in debezium-util. Generic over Callable/Runnable, accepts any retryable predicate, exponential backoff with jitter, configurable cap. Used by the parallel chunk round to recover from transient JDBC failures (lock timeout, deadlock victim, broken connection in the pool) without aborting the whole snapshot.

Configuration

Property Type Default Description
snapshot.max.threads INT 1 Pre-existing. 1 = sequential (no behavior change). > 1 = parallel incremental + chunked snapshot
incremental.snapshot.pool.release.delay.ms LONG 60000 Grace period before the snapshot JDBC pool is released after the last signal completes. 0 releases immediately.
incremental.snapshot.retry.max.attempts INT 5 Max retries on transient failure during a chunk read
incremental.snapshot.retry.initial.delay.ms LONG 1000 Initial backoff delay
incremental.snapshot.retry.max.delay.ms LONG 60000 Max backoff delay cap
incremental.snapshot.retry.backoff.multiplier DOUBLE 2.0 Exponential growth factor

All five new properties carry DEFAULT_* public static final constants. A new Field::isPositiveDouble validator is added for the backoff multiplier; the other properties use existing validators.

Testing

  • Runtime-validated on production-configured PostgreSQL 17 deployments with the Iceberg sink (Lakekeeper REST catalog on GCS), snapshot.max.threads=2, chunk size 20000. The parallel coordinator initializes the pool on first signal arrival, reuses it across back-to-back signal bursts, and releases it cleanly once the queue drains past the grace period. Workers stay continuously busy via the shared queue — verified zero idle time between sub-batches in the live logs. No leaked connections or scheduled-release races observed across multiple recovery cycles.

Backward compatibility

  • snapshot.max.threads defaults to 1 → no behavior change unless explicitly opted in
  • Notification format unchanged (same aggregateType="Incremental Snapshot" + same per-type schema)
  • New retry properties are additive, all have safe defaults

AI usage disclosure

I used Claude (Anthropic) as a code assistant and for troubleshooting during this work. I reviewed every change, ran the test suite locally, and validated the runtime behaviour. I take full responsibility for the code submitted.

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 4ec06eb to ce4c456 Compare April 16, 2026 19:18
@github-actions

Copy link
Copy Markdown

Welcome as a new contributor to Debezium, @MrIvv. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively.

@github-actions

Copy link
Copy Markdown

Hi @MrIvv, thanks for your contribution. Please prefix the commit message(s) with the debezium/dbz#xxx GitHub issue key.

1 similar comment
@github-actions

Copy link
Copy Markdown

Hi @MrIvv, thanks for your contribution. Please prefix the commit message(s) with the debezium/dbz#xxx GitHub issue key.

@MrIvv MrIvv changed the title feat: Add parallel incremental snapshot with configurable batch flush and SPI completion signal DBZ-1829 feat: Add parallel incremental snapshot with configurable batch flush and SPI completion signal Apr 16, 2026
@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from ce4c456 to b122b28 Compare April 16, 2026 19:31
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

2 similar comments
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@Naros

Naros commented Apr 17, 2026

Copy link
Copy Markdown
Member

Hi @MrIvv, there appears to be a lot of overlap with the new Chunked Initial Snapshot behavior, just refactored to its own classes. Can you see if we can reuse that somehow?

Also please prefix all your commits as debezium/dbz#1829

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from b122b28 to f6ca68b Compare April 17, 2026 08:26
@MrIvv

MrIvv commented Apr 17, 2026

Copy link
Copy Markdown
Author

Hi @Naros, you're right. I'm already working on the refactoring and I'll update this pr soon

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from f6ca68b to b5c046f Compare April 17, 2026 16:01
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@Naros

Naros commented Apr 17, 2026

Copy link
Copy Markdown
Member

@MrIvv before you get too far along, it appears your PR is reintroducing debezium-core, which it should not. You may want to rebase on the latest main so that the classes that have been recently reorganized into various new modules are not showing as new files in the PR.

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 7686f91 to 1b25f33 Compare April 18, 2026 08:23
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch 2 times, most recently from ca0be4c to 900b28e Compare April 20, 2026 13:40
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@github-actions

Copy link
Copy Markdown

Hi @MrIvv, thanks for your contribution. Please prefix the commit message(s) with the debezium/dbz#xxx GitHub issue key.

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 900b28e to f235572 Compare April 20, 2026 13:42
@github-actions

Copy link
Copy Markdown

Hi @MrIvv. Thank you for your valuable contribution.
Please author your commit(s) using an email linked to your GitHub account.

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch 2 times, most recently from f9a73d9 to 4069780 Compare April 20, 2026 14:04
@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 6cc9c98 to c1055b7 Compare April 27, 2026 15:41
@MrIvv MrIvv changed the title DBZ-1829 feat: Add parallel incremental snapshot with configurable batch flush and SPI completion signal debezium/dbz#1829 Add parallel incremental snapshot with configurable batch flush, SPI completion signal, and snapshot transforms Apr 27, 2026
MrIvv pushed a commit to MrIvv/debezium-server-iceberg that referenced this pull request Apr 29, 2026
…splitting, and data loss fix

Streaming Flush Architecture:
- Persistent IcebergWriter per table throughout incremental snapshot lifecycle
- Chunked writes via SnapshotTableCompletionHandler SPI from debezium-core
- Adaptive file splitting calibrated to actual data size and available memory
- BatchCommitCoordinator for coordinated Iceberg commits across parallel workers

Performance:
- Throughput: ~14K -> ~80-120K rows/min
- Peak memory: ~1.5GB -> ~200-300MB per worker
- Parallel multi-table snapshot processing

Critical Bug Fix:
- processTablesInParallel() was catching exceptions without re-throwing,
  allowing Debezium to commit offsets despite failed writes, causing
  permanent silent data loss

Schema Evolution Fixes:
- Handle optional PK fields from CDC schema in applyFieldAddition
- requireColumn for identifier fields widened to optional by unionByNameWith
- Treat READ operations as direct INSERT in BaseDeltaTaskWriter
- Evolve required fields to optional to prevent Parquet NPE on NULL values

Additional:
- Upgrade Debezium dependency from 3.3.1.Final to 3.6.0-SNAPSHOT
- Support nested namespaces with dot separator for Iceberg catalog
- Apply SMT chain to snapshot events for type consistency with CDC path
- Consumer done flag for external report watcher lifecycle
- OpenLineage output dataset emission integration
- Quarkus management interface enabled at build time

Depends-on: debezium/debezium#7362 (SPI classes for multi-threaded snapshot)
Tested: PostgreSQL 16, 116 tables, ~128M rows, zero data loss
@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 5bb395b to bb67484 Compare May 4, 2026 13:20
MrIvv added a commit to MrIvv/debezium that referenced this pull request May 4, 2026
…trics, RetryExecutor

Addresses code review feedback from @Naros on PR debezium#7362:

- Remove dead code: RecordTransformer (deleted), DebeziumEventFactory
  (deleted entirely — its single wrapSourceRecord() was redundant with
  ConverterBuilder.toFormat() configured with Connect format, which
  already does the same raw SourceRecord -> ChangeEvent wrap)
- Move IncrementalSnapshotRetryPolicy to debezium-util as RetryExecutor,
  generalized (no SQLException-specific handling) and reusable across
  Debezium components
- Extract config defaults to public static final constants with
  Field::isPositiveInteger / Field::isPositiveLong validation on all
  numeric properties
- Lower DEFAULT_INCREMENTAL_SNAPSHOT_BATCH_FLUSH_SIZE from 20_000 to
  5_000 for safety on wide-row tables (VARCHAR/TEXT/JSONB rows can
  saturate worker memory at higher batch sizes)
- Wire PostProcessorRegistry and SnapshotProgressListener into
  TableSnapshotWorker so post-processors and metrics now run on snapshot
  events the same as on CDC events
- Refactor SnapshotRecordBuilder to use SnapshotChangeRecordEmitter +
  EventDispatcher.emitReadRecord() (same path as the chunked initial
  snapshot) instead of a custom flat builder; the builder shrinks from
  126 to 66 lines and is now a thin envelope wrapper
- Revert unrelated debezium-api/pom.xml change (restore
  "Used for unit testing with Kafka" comment block)

The remaining SnapshotTableCompletionHandler SPI exists solely for the
onTableSnapshotFinished(tableName) callback — a per-table boundary signal
that EventDispatcher does not currently expose. It enables sinks with
batched-write semantics to switch from writer-per-chunk to
writer-per-table-with-periodic-split. Concrete consumer implementation:
memiiso/debezium-server-iceberg#693, which now uses
ConverterBuilder.toFormat() to wrap raw SourceRecords into change events.

Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
Comment on lines +1769 to +1770
if (value instanceof SpecialValueDecimal) {
value = ((SpecialValueDecimal) value).getDecimalValue().orElse(null);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (value instanceof SpecialValueDecimal) {
value = ((SpecialValueDecimal) value).getDecimalValue().orElse(null);
if (value instanceof SpecialValueDecimal specialValueDecimal) {
value = specialValueDecimal.getDecimalValue().orElse(null);

* @param <P> the type of partition
* @param <T> the type of data collection identifier
*/
public class ParallelIncrementalSnapshotCoordinator<P extends Partition, T extends DataCollectionId> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at RelationalSnapshotChangeEventSource, specifically ThreadedSnapshotExecutor and PooledWork. It seems here you're mixing concerns, and while having a coordinator that faciliates a higher-abstraction may make sense, perhaps reusing the lower-level components here reduces code duplication?

@MrIvv

MrIvv commented May 4, 2026

Copy link
Copy Markdown
Author

Hi @Naros, force-pushed after rebase on latest main. The branch is now 5 commits.

Summary of what changed in response to the review:

  • Pipeline alignment (SnapshotRecordBuilder, SPI integration, DebeziumEventFactory): snapshot events flow through SnapshotChangeRecordEmitter + EventDispatcher.emitReadRecord() and have PostProcessorRegistry + SnapshotProgressListener applied. RecordTransformer and the flat-struct logic in DebeziumEventFactory are gone.
  • Code reuse (RetryExecutor): moved to debezium-util, generalized for reuse from Debezium Server.
  • Config (CommonConnectorConfig): defaults extracted to public static final constants with Field::isPositive* validation. Default batch flush size lowered from 20_000 to 5_000 for safety on wide-row tables.
  • Cleanup (pom.xml revert, dead code removal): unrelated pom.xml change reverted, dead code removed.

The remaining SnapshotTableCompletionHandler SPI exists only for the per-table completion signal needed by sinks with writer-per-table semantics — concrete consumer at memiiso/debezium-server-iceberg#693, details in the inline replies.

DCO, email, and commit prefix issues from the previous push are also fixed.

@MrIvv MrIvv requested a review from Naros May 5, 2026 06:37
MrIvv added a commit to MrIvv/debezium-server-iceberg that referenced this pull request May 5, 2026
…le splitting

This commit implements the streaming snapshot flush pattern for the Iceberg sink.
Combined with the parallel incremental snapshot SPI introduced in
debezium/debezium#7362, it dramatically reduces commit overhead and memory
pressure during snapshot of large tables.

## Streaming snapshot flush

Instead of creating a new Iceberg writer for every batch (5K-20K rows), keep a
single writer open per table for the entire snapshot. The writer accumulates
data across chunks and produces a single atomic commit at table completion.

Periodic file splitting kicks in when the writer reaches a calibrated row
threshold, producing ~512MB Parquet files. After the first split-commit, the
threshold is recalibrated from actual file size (bytes-per-row) and clamped
by available heap (60% of max heap, divided by worker count, divided by an
in-memory factor of ~40x for Parquet decompression).

## Components

- `IcebergSnapshotCompletionHandler` — implements the SPI from
  debezium-connector-common. Routes per-chunk events to the streaming writer
  and triggers final commit on `onTableSnapshotFinished()`.
- `BatchCommitCoordinator` — accumulates events from CDC streaming path
  (legacy fallback when SPI not available).
- `IcebergChangeConsumer.StreamingSnapshotContext` — per-table state holder:
  open writer, cached schema converter, calibrated split threshold.
- `IcebergTableOperator.writeChunkToWriter()` / `commitWriter()` — write
  without commit / final atomic commit + `CommitResult` for adaptive
  calibration.
- `IcebergTableOperator.isSafeTypeChange()` — allows compatible type
  evolution (timestamptz↔timestamp, decimal↔double, int↔long) for
  pre-existing tables with legacy schemas.
- `StructEventConverter` — cached schema converter constructor, static
  `fieldMappingCache` for performance.
- `EventConverter.isSnapshotEvent()` — used to skip equality-delete writes
  for READ ops.
- Schema evolution + identifier field protection in
  `IcebergTableOperator.applyFieldAddition()` — protect both new schema's
  and existing table's identifier fields when key schema is unavailable
  (e.g. `key.converter.schemas.enable=false`).

## Throughput / memory impact (production, PostgreSQL 16, 116 tables, ~128M rows)

| Metric                  | Before (per-batch writer) | After (streaming + adaptive split) |
|-------------------------|---------------------------|-------------------------------------|
| Iceberg writers / table | ~1,500                    | 1 (with periodic file splits)       |
| Iceberg commits / table | ~1,500                    | ~6-10 (one per ~512MB Parquet file) |
| Throughput              | ~14K rows/min             | ~80-120K rows/min                   |
| Peak memory / worker    | ~1.5 GB                   | ~200-300 MB                         |

## Build alignment

Pin `kafka-clients:4.2.0` (matches `connect-runtime:4.2.0` from
`debezium-bom:3.6.0-SNAPSHOT`; the `debezium-server-bom:3.5.0.Final` would
otherwise pull `kafka-clients:4.1.1` which is missing
`ConfigDef$ValidList.anyNonDuplicateValues`).

Pin `httpclient5:5.4.3` to avoid the 5.4.3+5.5 classpath duplication that
caused HEAD-request format issues against some REST catalogs (Lakekeeper).

## Dependencies

This PR depends on debezium/debezium#7362 which introduces the
`SnapshotTableCompletionHandler` SPI in `debezium-connector-common`.
The CI build will fail until that PR is merged and `debezium-bom:3.6.0-SNAPSHOT`
is published.

## Spinoff PRs (already extracted, mergeable independently before this one)

- memiiso#695 — Support nested namespaces with dot separator
- memiiso#696 — OpenLineage integration and Quarkus management interface
- memiiso#698 — Snapshot READ semantics (READ as INSERT, missing __op handling)
- memiiso#699 — Critical data loss fix in processTablesInParallel

When those are merged, this PR's diff will shrink to only the streaming
flush changes + build alignment.

Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
@Naros

Naros commented May 5, 2026

Copy link
Copy Markdown
Member

The remaining SnapshotTableCompletionHandler SPI exists only for the per-table completion signal needed by sinks with writer-per-table semantics — concrete consumer at memiiso/debezium-server-iceberg#693, details in the inline replies.

But I would argue this is accomplishable today without this SPI.

What I don't understand is why the Iceberg ' ChangeEventConsumer ' couldn't subscribe to one of the pre-existing notification channels about initial, blocking, and incremental snapshots, and alternate between the current event-batch handler logic and a persistent writer mode by table?

For example, Debezium emits "Incremental Snapshot started for Table ABC". The Iceberg sink receives it and adds the table "ABC" to its list using a persisted writer. As batches of events come in for table ABC, you use the persisted writer for them; they are buffered in memory or on disk, depending on your use case, and acknowledge receipt of the events, just like any other. The only difference is they're not yet written to the target system. Once Debezium emits "Incremental Snapshot finished for Table ABC", you flush the writer to disk, close it, and transition Table ABC back to the normal batch mode it operates in today.

The only question is whether or not there is currently a place in a sink for it to register for such events, e.g. is Debezium available for it to register at a given good point. Obviously, this could be done lazily in the handleBatch callback, but ideally, doing this earlier would be much better. If there isn't a good spot for this "sync-point", I'd say that might be worth a discussion and potentially a feature PR for DS.

This keeps what I believe are sink-related aspects where they belong, on the sink and not the producer. What you are doing right now is introducing the ability to wire something in, potentially in a non-DS environment, that won't work and makes no sense. The connector code itself should be runtime-agnostic in this regard, which is why I believe moving this work to the sink and using notifications to manage the writes makes far more architectural sense.

What do you think?

@Naros Naros requested a review from kmos May 5, 2026 18:02
@Naros

Naros commented May 5, 2026

Copy link
Copy Markdown
Member

I'd say that might be worth a discussion and potentially a feature PR for DS.

I talked with a colleague, and he reminded me about Debezium Server's lifecycle management. There is already ConnectorStartedEvent and ConnectorStoppedEvent that the consumer could observe and handle this setup quite nicely. Even a custom notification channel that acts as a bridge between the producer & the consumer fits well here.

@kmos

kmos commented May 7, 2026

Copy link
Copy Markdown
Member

I'd say that might be worth a discussion and potentially a feature PR for DS.

I talked with a colleague, and he reminded me about Debezium Server's lifecycle management. There is already ConnectorStartedEvent and ConnectorStoppedEvent that the consumer could observe and handle this setup quite nicely. Even a custom notification channel that acts as a bridge between the producer & the consumer fits well here.

Hi @MrIvv , thank you for your contribution. I went through your issue and pull request and based on your motivation:

No table completion signal: Sink connectors receive chunks via onTableSnapshotCompleted() but have no way to know when a table is finished — forcing sinks that keep a writer open across chunks (e.g. Iceberg streaming flush) to either create a new writer per chunk (massive overhead, many small files) or sniff completion from offset state (fragile)

as mentioned by @Naros there is the custom notification channel that can help your motivation: you can observe the event TABLE_SCAN_COMPLETED and try to address your need.

if you can give a try and check if it fits your motivation, it may help to reduce the complexity of the contribution. What do you think?

@MrIvv

MrIvv commented May 8, 2026

Copy link
Copy Markdown
Author

Hello @kmos , your suggestion is amazing! It’s a great point that simplifies things a lot. I’ll update the code today to include these parts as well

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from bb67484 to 2690d4a Compare May 10, 2026 12:32
@MrIvv

MrIvv commented May 10, 2026

Copy link
Copy Markdown
Author

Hi @Naros and @kmos , thanks for the patient review and for pointing me toward the right pattern.
I've taken your suggestion all the way through and force-pushed the result. Two things changed:
1. SPI removed in favor of TABLE_SCAN_COMPLETED notifications
2. ThreadedSnapshotExecutor and PooledWork reused
I've updated the PR description to reflect the reduced scope.
Let me know if there's anything else i can improve

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 2690d4a to 5a79f69 Compare May 10, 2026 13:01
MrIvv added a commit to MrIvv/debezium-server-iceberg that referenced this pull request May 10, 2026
…le splitting

Adds a streaming flush mode for the Iceberg sink that keeps a single
persistent writer per data collection across an entire incremental
snapshot, replacing the writer-per-chunk pattern that produced one
small Parquet file per chunk and inflated catalog metadata.

A persistent writer is opened on the first chunk of a table and stays
open across subsequent chunks, accumulating events in memory or
spilling to a rolling temporary file when adaptive thresholds are
crossed (rows-per-file, bytes-per-file, time-since-first-row). The
writer is committed and closed on the corresponding TABLE_SCAN_COMPLETED
notification emitted by Debezium core.

Per-table completion is consumed via the standard Debezium notification
channels (`SinkNotificationChannel`) — no producer-side SPI is added.
The consumer subscribes at startup, filters for
`aggregateType="Incremental Snapshot"` and
`type=TABLE_SCAN_COMPLETED`, and finalizes the per-table writer using
the `scanned_collection` field. JSON-serialized notifications (the
default with `debezium.format.value=json`) are unwrapped from the
`{schema, payload}` envelope before reading the fields.

A read-only REST endpoint `/v1/snapshot-status/incremental` exposed
on Quarkus' management interface (port 9000) reports per-table progress
(rows, files, bytes committed; current/in-progress tables) so external
orchestrators can observe completion without parsing the offset file.
The status snapshot tracks both the streaming-writer path and the
direct-commit path (via per-table counters) so the endpoint surfaces a
correct view regardless of which write path is in use.

This PR depends on the parallel incremental snapshot work in
debezium/debezium#7362 — that PR introduces the parallel scheduler
and the per-table TABLE_SCAN_COMPLETED notification semantics this
sink consumes. Until that PR is merged and a Debezium snapshot is
published, this PR cannot build against an upstream artifact and must
be built with a locally-installed `debezium-core` snapshot.

Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
Comment on lines +972 to +981
private boolean createDataEventsForTable(P partition, JdbcConnection connection) throws SQLException {
JdbcConnection originalConnection = this.jdbcConnection;
try {
this.jdbcConnection = connection;
return createDataEventsForTable(partition);
}
finally {
this.jdbcConnection = originalConnection;
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this is thread-safe and that at no point two threads won't fight for what connection is the member variable connection here? I think it's much safer to pass the desired connection through the call where it's needed rather than relying on this hacky swap approach.

Comment on lines +1149 to +1154
public void shutdown() {
if (parallelCoordinator != null) {
LOGGER.info("Shutting down parallel incremental snapshot coordinator");
parallelCoordinator.shutdown();
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is ever called.

private final List<JdbcConnection> allConnections;
private final Map<T, TableSnapshotWorker<P, T>> activeWorkers;
private final Queue<DataCollection<T>> pendingTables;
private final List<T> completedTables;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ArrayList is written to by multiple threads and isn't guarded.

Comment on lines +1156 to +1171
protected boolean shouldUseParallelRead() {
if (parallelCoordinator == null) {
return false;
}

int remainingTables = context.dataCollectionsToBeSnapshottedCount();
if (remainingTables < 2) {
LOGGER.trace("[{}] Only {} table(s) remaining, using sequential read",
Thread.currentThread().getName(), remainingTables);
return false;
}

LOGGER.debug("[{}] Multiple tables available ({} remaining), using parallel read",
Thread.currentThread().getName(), remainingTables);
return true;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is also never used.

protected final NotificationService<P, ? extends OffsetContext> notificationService;
protected ParallelIncrementalSnapshotCoordinator<P, T> parallelCoordinator;
protected final RetryExecutor retryPolicy;
protected final PostProcessorRegistry postProcessorRegistry;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PostProcessorRegistry is set but never used, so it can be removed.

Comment on lines +425 to +435
try {
effectiveConnection = createSnapshotConnection();
createdOnDemandConnection = true;
}
catch (UnsupportedOperationException e) {
LOGGER.trace("createSnapshotConnection not supported, using default connection");
}
catch (Exception e) {
LOGGER.debug("Could not create snapshot connection, using default: {}", e.getMessage());
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid relying on an exception here to use the default connection? I assume this can be easily controlled by a flag or an abstract method that dictates whether the incremental snapshot operates in concurrent or single-threaded mode without the need for exception handling.

Comment on lines +1179 to +1184
protected boolean validateRestoredContext(IncrementalSnapshotContext<T> context) {
try {
java.lang.reflect.Method method = context.getClass().getMethod("validateRestoredContext");
Boolean result = (Boolean) method.invoke(context);
return result != null ? result : true;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What purpose does this logic serve now? This seems unnecessary is it not?

Comment on lines +771 to +785
public static final Field INCREMENTAL_SNAPSHOT_RETRY_INITIAL_DELAY_MS = Field.create("incremental.snapshot.retry.initial.delay.ms")
.withDisplayName("Incremental snapshot retry initial delay (ms)")
.withType(Type.LONG)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Initial backoff delay before retrying a failed chunk read.")
.withDefault(DEFAULT_INCREMENTAL_SNAPSHOT_RETRY_INITIAL_DELAY_MS);

public static final Field INCREMENTAL_SNAPSHOT_RETRY_MAX_DELAY_MS = Field.create("incremental.snapshot.retry.max.delay.ms")
.withDisplayName("Incremental snapshot retry max delay (ms)")
.withType(Type.LONG)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Upper bound on the exponential backoff delay between retries.")
.withDefault(DEFAULT_INCREMENTAL_SNAPSHOT_RETRY_MAX_DELAY_MS);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these should be always positive, they should have validation applied.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add

* Returns true if any per-table window buffer (parallel mode) or the
* legacy single window (sequential mode) currently holds events. Required
* because in parallel mode the legacy {@code window} field is always empty
* — chunk reads write into the coordinator's per-table buffers — so a

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of emdashes is very AI-centric, let's avoid these. In fact, our AGENTS.md specifically mention to avoid these types of characters, among others. I'd suggest if you're using AI that it adhere or our AI Policy and Agent rules.

final T dataCollectionId = context.currentDataCollectionId().getId();
final Map<Struct, Object[]> currentWindow = getWindowForDataCollection(dataCollectionId);

LOGGER.debug("[{}] Sending {} events from window buffer for table {}",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of use cases where Thread.currentThread().getName() is used even on the single-threaded code path, which only adds noise and I believe is less useful than the actual parallel thread paths. Can we clean these up?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I will clean them up

…nnectors

Adds support for snapshotting multiple data collections in parallel during
incremental snapshots, bounded by snapshot.max.threads. The JDBC connection
pool opens on demand when a signal arrives and is released after a
configurable grace period (incremental.snapshot.pool.release.delay.ms,
default 60000), so back-to-back signal bursts reuse the same pool. Pooled
connections are validated on borrow and reallocated lazily.

The chunk read path is connection-threaded end to end and wrapped in a
RetryExecutor with exponential backoff to absorb transient JDBC failures.
The retry budget and the pool release delay are documented per connector
in the relational AsciiDoc pages.

See the PR description for the full design notes.

Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 5a79f69 to 0a3abf1 Compare May 26, 2026 09:18
@MrIvv MrIvv changed the title debezium/dbz#1829 Add parallel incremental snapshot with configurable batch flush, SPI completion signal, and snapshot transforms debezium/dbz#1829 Add parallel incremental snapshot for relational connectors May 26, 2026

@MrIvv MrIvv left a comment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Naros, I've made a few changes to improve the thread safety of this code, mainly focusing on the lifecycle of the connections

@Naros

Naros commented May 27, 2026

Copy link
Copy Markdown
Member

/packit test --labels oracle

@MrIvv MrIvv force-pushed the debezium-evolution-mt branch 2 times, most recently from 1304d44 to 74b2f67 Compare June 5, 2026 10:22
@MrIvv

MrIvv commented Jun 5, 2026

Copy link
Copy Markdown
Author

Hello @Naros, I reviewed all CI jobs and found some regressions in my code.
I've added a fix in this new commit for easier review, I think all critical issues have now been covered

@Naros

Naros commented Jun 8, 2026

Copy link
Copy Markdown
Member

Thanks for the update @MrIvv, I've started CI. I'll review it once CI finishes.

@Naros

Naros commented Jun 12, 2026

Copy link
Copy Markdown
Member

Hi @MrIvv can you just check that the failure in MariaDB isn't related.

…ycle

+ Null-guard `currentDataCollectionId()` deref in `sendWindowEvents()` and `rereadChunk()`; the field can be null after the snapshot has advanced past the last collection.

* `TableSnapshotWorker` keyless tables now fail fast with a diagnostic instead of routing to a broken `readKeylessTable`; DBLog window cannot dedup keyless rows.

- Drop `readKeylessTable` and the `keylessTableRead` flag.

+ Wire `IncrementalSnapshotChangeEventSource.shutdown()` from `EventDispatcher.close()` so parallel coordinator resources are released on connector stop. Interface gets a default no-op for backward compatibility.

* `TableSnapshotWorker.isChunkPositionComplete` now uses `Arrays.equals` instead of a per-key `compareTo` loop. The previous comparison was type-dependent (e.g. `UUID.compareTo` is signed-long-based while Postgres `ORDER BY uuid` is unsigned-lex), which caused premature completion after the first chunk on UUID-keyed tables.

Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
@MrIvv MrIvv force-pushed the debezium-evolution-mt branch from 74b2f67 to 9cc2ae6 Compare June 15, 2026 12:50
@MrIvv

MrIvv commented Jun 15, 2026

Copy link
Copy Markdown
Author

Hi @Naros,
force-pushed one extra fix into the review-iteration commit:
TableSnapshotWorker.isChunkPositionComplete now uses Arrays.equals instead of a per-key compareTo loop.

On the MariaDB CI failure (BinlogReadOnlyIncrementalSnapshotIT.filteredEvents) I do not believe it is caused by this PR. The stack is entirely in MariaDB-specific code that this PR does not touch:

NPE: this.highWatermark is null
at MariaDbReadOnlyIncrementalSnapshotContext.serverStreamSetChanged:121
at MariaDbReadOnlyIncrementalSnapshotContext.hasServerIdentifierChanged:63
at BinlogReadOnlyIncrementalSnapshotChangeEventSource.emitWindowClose:123

The exception is swallowed by the catch in AbstractIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot ("Error while executing incremental snapshot ... skipping and continuing streaming"), so the snapshot is silently abandoned and the consumer
times out with 0 records.
Only 1 of 4 MariaDB matrix variants failed, the others passed on the same code, consistent with a timing-sensitive race on highWatermark initialization rather than a regression from this PR.

Happy to dig deeper if useful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants