feat: OpenLineage integration and Quarkus management interface#696
Conversation
3f3afef to
9576770
Compare
…erator Emit DatasetMetadata (table name, schema fields, OUTPUT/DATABASE) after each successful Iceberg commit in addToTablePerSchema(). Uses the standard DebeziumOpenLineageEmitter API with graceful fallback to NoOpLineageEmitter when OpenLineage runtime is not configured. Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
9576770 to
e685f2c
Compare
…le splitting This commit implements the streaming snapshot flush pattern for the Iceberg sink. Combined with the parallel incremental snapshot SPI introduced in debezium/debezium#7362, it dramatically reduces commit overhead and memory pressure during snapshot of large tables. ## Streaming snapshot flush Instead of creating a new Iceberg writer for every batch (5K-20K rows), keep a single writer open per table for the entire snapshot. The writer accumulates data across chunks and produces a single atomic commit at table completion. Periodic file splitting kicks in when the writer reaches a calibrated row threshold, producing ~512MB Parquet files. After the first split-commit, the threshold is recalibrated from actual file size (bytes-per-row) and clamped by available heap (60% of max heap, divided by worker count, divided by an in-memory factor of ~40x for Parquet decompression). ## Components - `IcebergSnapshotCompletionHandler` — implements the SPI from debezium-connector-common. Routes per-chunk events to the streaming writer and triggers final commit on `onTableSnapshotFinished()`. - `BatchCommitCoordinator` — accumulates events from CDC streaming path (legacy fallback when SPI not available). - `IcebergChangeConsumer.StreamingSnapshotContext` — per-table state holder: open writer, cached schema converter, calibrated split threshold. - `IcebergTableOperator.writeChunkToWriter()` / `commitWriter()` — write without commit / final atomic commit + `CommitResult` for adaptive calibration. - `IcebergTableOperator.isSafeTypeChange()` — allows compatible type evolution (timestamptz↔timestamp, decimal↔double, int↔long) for pre-existing tables with legacy schemas. - `StructEventConverter` — cached schema converter constructor, static `fieldMappingCache` for performance. - `EventConverter.isSnapshotEvent()` — used to skip equality-delete writes for READ ops. - Schema evolution + identifier field protection in `IcebergTableOperator.applyFieldAddition()` — protect both new schema's and existing table's identifier fields when key schema is unavailable (e.g. `key.converter.schemas.enable=false`). ## Throughput / memory impact (production, PostgreSQL 16, 116 tables, ~128M rows) | Metric | Before (per-batch writer) | After (streaming + adaptive split) | |-------------------------|---------------------------|-------------------------------------| | Iceberg writers / table | ~1,500 | 1 (with periodic file splits) | | Iceberg commits / table | ~1,500 | ~6-10 (one per ~512MB Parquet file) | | Throughput | ~14K rows/min | ~80-120K rows/min | | Peak memory / worker | ~1.5 GB | ~200-300 MB | ## Build alignment Pin `kafka-clients:4.2.0` (matches `connect-runtime:4.2.0` from `debezium-bom:3.6.0-SNAPSHOT`; the `debezium-server-bom:3.5.0.Final` would otherwise pull `kafka-clients:4.1.1` which is missing `ConfigDef$ValidList.anyNonDuplicateValues`). Pin `httpclient5:5.4.3` to avoid the 5.4.3+5.5 classpath duplication that caused HEAD-request format issues against some REST catalogs (Lakekeeper). ## Dependencies This PR depends on debezium/debezium#7362 which introduces the `SnapshotTableCompletionHandler` SPI in `debezium-connector-common`. The CI build will fail until that PR is merged and `debezium-bom:3.6.0-SNAPSHOT` is published. ## Spinoff PRs (already extracted, mergeable independently before this one) - memiiso#695 — Support nested namespaces with dot separator - memiiso#696 — OpenLineage integration and Quarkus management interface - memiiso#698 — Snapshot READ semantics (READ as INSERT, missing __op handling) - memiiso#699 — Critical data loss fix in processTablesInParallel When those are merged, this PR's diff will shrink to only the streaming flush changes + build alignment. Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
| # These properties are applied at Maven build time | ||
| # and CANNOT be modified at runtime. |
There was a problem hiding this comment.
could we add this setting to user properties instead? or define in the Config class with defaults set to True and 9000. this is not flexible way to do add them.
e685f2c to
1b3158e
Compare
|
Hi @ismailsimsek, switched the two quarkus.management.enabled=${iceberg.management.enabled:true}
quarkus.management.port=${iceberg.management.port:9000}Defaults stay Note that |
Add application.properties with quarkus.management.enabled=true as a build-time property. This enables a separate HTTP server on port 9000 for health/ready/live endpoints, using its own thread pool independent from the main Vert.x event loop. Without this, health probes share the same event loop that gets blocked for 10-20+ seconds during Iceberg commits and GCS uploads, causing K8s to consider the pod unhealthy and restart it. Signed-off-by: ivan.senyk <ivan.senyk94@gmail.com>
1b3158e to
ccf7011
Compare
Summary
Two independent features bundled in a single PR (each in a dedicated commit) for the iceberg sink:
DatasetMetadataafter each successful Iceberg commit, using the standardDebeziumOpenLineageEmitterAPIOpenLineage integration
In
IcebergTableOperator.addToTablePerSchema(), after a successful commit, emit dataset metadata (table name, schema fields, OUTPUT/DATABASE) usingDebeziumOpenLineageEmitter.emit(). Falls back toNoOpLineageEmitterwhen OpenLineage runtime is not configured, so downstream users without OpenLineage continue to work unchanged.Files:
pom.xml— adddebezium-openlineage-apidependencyIcebergTableOperator.java— addemitOpenLineageEvent()after commitQuarkus management interface
Add
application.propertieswithquarkus.management.enabled=trueas a build-time property. This enables a separate HTTP server on port 9000 for health/ready/live endpoints, using its own thread pool independent from the main Vert.x event loop.Why this matters in production: without this, health probes share the same event loop that gets blocked for 10-20+ seconds during Iceberg commits and GCS uploads, causing K8s liveness probes to fail and the pod to restart mid-commit.
Files:
application.properties—quarkus.management.enabled=trueBackward compatibility