[FLINK-39230] Transform should convert partially filtered UPDATE events to INSERT / DELETE#4319
Conversation
…TE w/ partially applied filter conditions
There was a problem hiding this comment.
Pull request overview
This PR updates post-transform filtering behavior so that when an UPDATE’s before/after images differ in filter outcome, the emitted event is converted accordingly (UPDATE, INSERT, DELETE, or dropped). It aligns unit/integration tests and composer YAML expectations with the new semantics.
Changes:
- Implement per-row filter evaluation for UPDATE events and convert the downstream op type based on (beforePass, afterPass).
- Add/adjust runtime and composer tests (plus YAML golden outputs) to reflect op-type conversion.
- Remove the “classification mapping” (first-match rule) documentation section from the transform docs (EN/ZH).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java | Converts UPDATE events into UPDATE/INSERT/DELETE/drop depending on before/after filter results. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java | Adds a unit test covering UPDATE filter op-type conversion cases. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java | Updates expectations to reflect UPDATE→DELETE conversion when an update moves a row into a filtered-out state. |
| flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java | Adds an ITCase for UPDATE op-type conversion under filtering; loosens one Calcite exception-type assertion. |
| flink-cdc-composer/src/test/resources/specs/basic.yaml | Updates expected events to include DELETEs / UPDATE→INSERT conversions in filtered scenarios. |
| flink-cdc-composer/src/test/resources/specs/nested.yaml | Updates expected events to reflect UPDATE→INSERT conversion and adds expected DELETEs in some cases. |
| docs/content/docs/core-concept/transform.md | Removes “Classification mapping” section describing first-match rule semantics. |
| docs/content.zh/docs/core-concept/transform.md | Same documentation removal as EN version. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (beforeFilterPassed && afterFilterPassed) { | ||
| finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); | ||
| } else if (beforeFilterPassed) { | ||
| finalEvent = DataChangeEvent.deleteEvent(tableId, beforeRow, event.meta()); | ||
| } else if (afterFilterPassed) { |
There was a problem hiding this comment.
I prefer not to re-evaluate expressions as the filter condition itself may depend on the opType itself, and that might cause inconsistencies.
Also, op_type should represent the original type from source, and keeping it intact should be acceptable.
…ts to INSERT / DELETE (apache#4319)
…ts to INSERT / DELETE (apache#4319)
This closes FLINK-39230.
Currently, transform filter conditions are only applied on the after() part of UPDATE event.
A more reasonable solution (like Flink SQL changelog semantics) would be: