feat: streaming snapshot flush with persistent writer and adaptive file splitting#693
feat: streaming snapshot flush with persistent writer and adaptive file splitting#693MrIvv wants to merge 1 commit into
Conversation
|
Note on CI failure: The build fails because this PR depends on debezium/debezium#7362 which introduces new SPI classes ( The CI cannot resolve this dependency until PR-A is merged and the snapshot is published. Once the core changes are available, this PR will build successfully. The code has been tested extensively in production with a locally-built Debezium core (PostgreSQL 16, 116 tables, ~128M rows, zero data loss). |
935a168 to
540cbd8
Compare
f8de242 to
88b64e6
Compare
There was a problem hiding this comment.
@MrIvv thank you for working on the improvements. the PR is large and contains multiple fixes and improvements. could we create small PRs to merge them. i added inline comments for the ones can be merged seperately, these fixes/features can be merged independently. spliting will also simplify the main PR for review
| hasFailures = true; | ||
| failureCount++; | ||
| // The original exception from the Callable is wrapped in ExecutionException | ||
| LOGGER.error("A task failed with an exception: {}", e.getCause().getMessage(), e.getCause()); |
There was a problem hiding this comment.
This is important data loss fix, could you create separate PR to merge this.
This change is completely independent it will also hep the review
| if (namespace == null || namespace.isEmpty()) { | ||
| return Namespace.of("default"); | ||
| } | ||
| // Support both dot separator and 0x1F (Unit Separator) used by Iceberg REST catalog |
There was a problem hiding this comment.
could we add this also with seperate PR?
is there iceberg code we can use to read string as namespace, something does the speeration for us? that would be ideal instead of doing split manually here.
| .unionByNameWith(newSchema) | ||
| .setIdentifierFields(newSchema.identifierFieldNames()); | ||
| // Pre-scan: detect conditions that require allowIncompatibleChanges() | ||
| boolean needsIncompatibleChanges = false; |
There was a problem hiding this comment.
could we also split this change and create standalone PR, for all the schema change fixes?
| if (!icebergTable.schema().sameSchema(newSchemaCombined)) { | ||
| LOGGER.warn("Extending schema of {}", icebergTable.name()); | ||
| us.commit(); | ||
| icebergTable.refresh(); |
| if (isNewKey && !keepDeletes && rowOperation != Operation.DELETE) { | ||
| // Optimization: Pure INSERT (new key) - direct write, | ||
| // skipping the costly pre-delete since we know no previous version exists. | ||
| if ((isNewKey || rowOperation == Operation.READ) && !keepDeletes && rowOperation != Operation.DELETE) { |
There was a problem hiding this comment.
could we also add thgis fix as separate PR? thank you this is also good fix.
| public StructEventConverter(EmbeddedEngineChangeEvent e, GlobalConfig config, | ||
| StructSchemaConverter sharedSchemaConverter) { | ||
| super(config); |
There was a problem hiding this comment.
this can also be seperate PR, we can review and merge independently
|
@ismailsimsek ok, I will try to split this work into different PRs (maybe 3 or 4, no more) |
…trics, RetryExecutor Addresses code review feedback from @Naros on PR debezium#7362: - Remove dead code: RecordTransformer (deleted), DebeziumEventFactory (deleted entirely — its single wrapSourceRecord() was redundant with ConverterBuilder.toFormat() configured with Connect format, which already does the same raw SourceRecord -> ChangeEvent wrap) - Move IncrementalSnapshotRetryPolicy to debezium-util as RetryExecutor, generalized (no SQLException-specific handling) and reusable across Debezium components - Extract config defaults to public static final constants with Field::isPositiveInteger / Field::isPositiveLong validation on all numeric properties - Lower DEFAULT_INCREMENTAL_SNAPSHOT_BATCH_FLUSH_SIZE from 20_000 to 5_000 for safety on wide-row tables (VARCHAR/TEXT/JSONB rows can saturate worker memory at higher batch sizes) - Wire PostProcessorRegistry and SnapshotProgressListener into TableSnapshotWorker so post-processors and metrics now run on snapshot events the same as on CDC events - Refactor SnapshotRecordBuilder to use SnapshotChangeRecordEmitter + EventDispatcher.emitReadRecord() (same path as the chunked initial snapshot) instead of a custom flat builder; the builder shrinks from 126 to 66 lines and is now a thin envelope wrapper - Revert unrelated debezium-api/pom.xml change (restore "Used for unit testing with Kafka" comment block) The remaining SnapshotTableCompletionHandler SPI exists solely for the onTableSnapshotFinished(tableName) callback — a per-table boundary signal that EventDispatcher does not currently expose. It enables sinks with batched-write semantics to switch from writer-per-chunk to writer-per-table-with-periodic-split. Concrete consumer implementation: memiiso/debezium-server-iceberg#693, which now uses ConverterBuilder.toFormat() to wrap raw SourceRecords into change events. Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
88b64e6 to
42bff6e
Compare
…le splitting
Adds a streaming flush mode for the Iceberg sink that keeps a single
persistent writer per data collection across an entire incremental
snapshot, replacing the writer-per-chunk pattern that produced one
small Parquet file per chunk and inflated catalog metadata.
A persistent writer is opened on the first chunk of a table and stays
open across subsequent chunks, accumulating events in memory or
spilling to a rolling temporary file when adaptive thresholds are
crossed (rows-per-file, bytes-per-file, time-since-first-row). The
writer is committed and closed on the corresponding TABLE_SCAN_COMPLETED
notification emitted by Debezium core.
Per-table completion is consumed via the standard Debezium notification
channels (`SinkNotificationChannel`) — no producer-side SPI is added.
The consumer subscribes at startup, filters for
`aggregateType="Incremental Snapshot"` and
`type=TABLE_SCAN_COMPLETED`, and finalizes the per-table writer using
the `scanned_collection` field. JSON-serialized notifications (the
default with `debezium.format.value=json`) are unwrapped from the
`{schema, payload}` envelope before reading the fields.
A read-only REST endpoint `/v1/snapshot-status/incremental` exposed
on Quarkus' management interface (port 9000) reports per-table progress
(rows, files, bytes committed; current/in-progress tables) so external
orchestrators can observe completion without parsing the offset file.
The status snapshot tracks both the streaming-writer path and the
direct-commit path (via per-table counters) so the endpoint surfaces a
correct view regardless of which write path is in use.
This PR depends on the parallel incremental snapshot work in
debezium/debezium#7362 — that PR introduces the parallel scheduler
and the per-table TABLE_SCAN_COMPLETED notification semantics this
sink consumes. Until that PR is merged and a Debezium snapshot is
published, this PR cannot build against an upstream artifact and must
be built with a locally-installed `debezium-core` snapshot.
Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
42bff6e to
d6b363e
Compare
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Summary
Adds a streaming flush mode for the Iceberg sink that keeps a single persistent writer per data collection across an entire incremental snapshot, replacing the writer-per-chunk pattern that produced one small Parquet file per chunk and inflated catalog metadata.
The persistent writer is opened on the first chunk of a table and stays open across subsequent chunks, accumulating events in memory or spilling to a rolling temporary file when adaptive thresholds are crossed (rows-per-file, bytes-per-file, time-since-first-row). The writer is committed and closed on the corresponding
TABLE_SCAN_COMPLETEDnotification emitted by Debezium core.Per-table completion via existing notification channels
Per-table completion is consumed via the standard Debezium
SinkNotificationChannel— no producer-side SPI is added. The consumer subscribes at startup, filters foraggregateType="Incremental Snapshot"+type=TABLE_SCAN_COMPLETED, and finalizes the per-table writer using thescanned_collectionfield. JSON-serialized notifications (the default withdebezium.format.value=json) are unwrapped from the{schema, payload}envelope before reading the fields.This follows the architectural guidance in debezium/debezium#7362 — keep producer-side concerns on Debezium core and consume completion signals through the existing notification machinery on the sink side.
Status REST endpoint
A read-only endpoint
/v1/snapshot-status/incrementalexposed on Quarkus' management interface (port 9000) reports per-table progress:completed[]— tables whoseTABLE_SCAN_COMPLETEDarrived, with rows / files / bytes committedinProgress[]— tables currently being scanned (tracked fromSTARTEDnotifications, refreshed from in-flight writes)summary— aggregate counts and last-completion / last-data-received timestampsThe status snapshot covers both the streaming-writer path and the direct-commit path (via per-table counters), so the endpoint surfaces a correct view regardless of which write path is in use. External orchestrators can observe completion without parsing the binary offset file.
Dependency on debezium-core changes
This PR depends on debezium/debezium#7362 which introduces parallel incremental snapshot in Debezium core, with per-table
TABLE_SCAN_COMPLETEDnotification semantics this sink consumes. Until that PR is merged and a Debezium snapshot is published, this PR cannot build against an upstream artifact and must be built with a locally-installeddebezium-coresnapshot (mvn installof thedebezium/debeziumfork at the matching branch, thenmvn installof this sink).Production validation
The same code is running in production on a 65-table workload with
snapshot.max.threads=2,incremental.snapshot.chunk.size=20000. Throughput observed ~42K rows/min, ~2.2× the single-threaded baseline measured on the same workload. Status endpoint reports correct per-table completion, no data loss across multiple pod restarts.Scope and overlap with separate sub-PRs
The single commit in this PR contains the full streaming flush feature. Three small companion fixes that ship in their own atomic PRs (and that the streaming flush code path imports indirectly) are included here as a build-time dependency:
IcebergUtil.parseNamespace— atomic PR: feat: support nested namespaces with dot separator #695application.properties) — atomic PR: feat: OpenLineage integration and Quarkus management interface #696__opfield handling inStructEventConverter— atomic PR: fix: handle missing __op field in snapshot events during deduplication #698These overlap with files touched by the streaming flush feature, so removing them would break compilation. They will collapse to no-op once the companion PRs are merged into master.
The data-loss re-throw fix (
processTablesInParallel) was already merged via #699 and is no longer in the diff.Backward compatibility
TABLE_SCAN_COMPLETEDnotifications are already emitted by Debezium core today; this PR only adds a consumer