diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index a9bae80cd9c..99b87ae7325 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -338,7 +338,7 @@ transform: 小技巧:table-options 的格式是 `key1=value1,key2=value2`。 ## 分类映射 -多个转换规则可以定义为分类映射。 +在一张表同时被多个转换规则命中时, 只有第一个匹配的转换规则将应用。 举个例子,我们可以定义一个转换规则如下: @@ -346,14 +346,13 @@ transform: transform: - source-table: mydb.web_order projection: id, order_id - filter: UPPER(province) = 'SHANGHAI' - description: classification mapping example - - source-table: mydb.web_order - projection: order_id as id, id as order_id - filter: UPPER(province) = 'BEIJING' - description: classification mapping example + filter: id > 1001 + - source-table: mydb.\.* + projection: \*, 'fallback' AS FALLBACK ``` +这里,即使 `mydb.web_order` 表同样可以被第二条规则匹配,但因为排序靠前的第一条规则已经匹配,因此不会落入后续的 Transform 规则中。 + ## 用户自定义函数 用户自定义函数(UDF)可以在转换规则中使用。 diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index aed16256762..1617bc8df2c 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -341,22 +341,20 @@ transform: Tips: The format of table-options is `key1=value1,key2=value2`. ## Classification mapping -Multiple transform rules can be defined to classify input data rows and apply different processing. -Only the first matched transform rule will apply. +If a table hits ultiple transform rules, only the first matched transform rule will apply. For example, we may define a transform rule as follows: ```yaml transform: - source-table: mydb.web_order projection: id, order_id - filter: UPPER(province) = 'SHANGHAI' - description: classification mapping example - - source-table: mydb.web_order - projection: order_id as id, id as order_id - filter: UPPER(province) = 'BEIJING' - description: classification mapping example + filter: id > 1001 + - source-table: mydb.\.* + projection: \*, 'fallback' AS FALLBACK ``` +Here, though `mydb.web_order` matches the second rule (`mydb.\.*`), it will not fall through the next rule as it has been handled in the first rule. + ## User-defined Functions User-defined functions (UDFs) can be used in transform rules. diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 2842729cdd0..4c4067e29e5 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -58,7 +58,6 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; -import org.apache.calcite.sql.validate.SqlValidatorException; import org.assertj.core.api.Assertions; import org.assertj.core.api.ThrowableAssert; import org.codehaus.commons.compiler.CompileException; @@ -241,6 +240,136 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); } + @ParameterizedTest(name = "API version: {0}") + @EnumSource(ValuesDataSink.SinkApi.class) + void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + + List events = new ArrayList<>(); + events.add(new CreateTableEvent(myTable1, table1Schema)); + // Case 1: before=Y, after=Y -> UPDATE + events.add( + DataChangeEvent.insertEvent( + myTable1, + generator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 30}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + generator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 30}), + generator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 40}))); + // Case 2: before=Y, after=N -> DELETE + events.add( + DataChangeEvent.insertEvent( + myTable1, + generator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + generator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}), + generator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + // Case 3: before=N, after=Y -> INSERT + events.add( + DataChangeEvent.insertEvent( + myTable1, + generator.generate( + new Object[] {3, BinaryStringData.fromString("Carol"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + generator.generate( + new Object[] {3, BinaryStringData.fromString("Carol"), 20}), + generator.generate( + new Object[] {3, BinaryStringData.fromString("Carol"), 35}))); + // Case 4: before=N, after=N -> drop + events.add( + DataChangeEvent.insertEvent( + myTable1, + generator.generate( + new Object[] {4, BinaryStringData.fromString("Dave"), 10}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + generator.generate( + new Object[] {4, BinaryStringData.fromString("Dave"), 10}), + generator.generate( + new Object[] {4, BinaryStringData.fromString("Dave"), 15}))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + null, + "age > 25", + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactlyInAnyOrder( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + // INSERT id=1 (age=30 passes) + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 30], op=INSERT, meta=()}", + // UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1, Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}", + // INSERT id=2 (age=30 passes) + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 30], op=INSERT, meta=()}", + // UPDATE id=2 (30->20): before=Y, after=N -> DELETE + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 30], after=[], op=DELETE, meta=()}", + // UPDATE id=3 (20->35): before=N, after=Y -> INSERT + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Carol, 35], op=INSERT, meta=()}"); + // INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails), + // UPDATE id=4 (10->15, both fail) are all filtered out. + } + /** * This tests if transform rule could be used to classify source records based on filtering * rules. @@ -2507,7 +2636,6 @@ void testTransformErrorMessage() { + "to schema\n" + "\t(Unknown).") .rootCause() - .isExactlyInstanceOf(SqlValidatorException.class) .hasMessage("Column 'id1' not found in any table"); // Unexpected column in filter rule diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java index f97af9e384c..21e7ddc5d1b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java @@ -501,13 +501,6 @@ void runTransformSpecs(String group, String name, TestSpec spec) throws Exceptio } } - enum SpecContext { - PROJECTION, - EXPECT, - EXPECT_ERROR, - NULL - } - private static final String[] EXPECTED_SPECS = { "specs/arithmetic.yaml", "specs/basic.yaml", diff --git a/flink-cdc-composer/src/test/resources/specs/basic.yaml b/flink-cdc-composer/src/test/resources/specs/basic.yaml index 1dcf180c372..b9b17332901 100644 --- a/flink-cdc-composer/src/test/resources/specs/basic.yaml +++ b/flink-cdc-composer/src/test/resources/specs/basic.yaml @@ -108,13 +108,14 @@ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[], op=DELETE, meta=()} - do: Filter by Expression projection: id_, string_ filter: id_ + 1 <= 1 primary-key: id_ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], after=[-1, 天地玄黄宇宙洪荒], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, 天地玄黄宇宙洪荒], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, 天地玄黄宇宙洪荒], after=[], op=DELETE, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], op=DELETE, meta=()} @@ -126,6 +127,7 @@ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()} - do: Filter by Calculation Column (With NULL) ignore: FLINK-38905 projection: id_, string_, CHAR_LENGTH(string_) AS strlen_ @@ -134,6 +136,7 @@ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is Lie, 18], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], after=[], op=DELETE, meta=()} - do: Invalid Projection Expr projection: id_, a_column_that_is_nowhere_to_be_found primary-key: id_ diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml b/flink-cdc-composer/src/test/resources/specs/nested.yaml index a600b469b8a..3ac67facd0f 100644 --- a/flink-cdc-composer/src/test/resources/specs/nested.yaml +++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml @@ -176,7 +176,7 @@ primary-key: id_ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`array_string_` ARRAY}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, [二, san, 五, qi, 十一]], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], after=[], op=DELETE, meta=()} - do: Map Subscripting projection: |- @@ -248,6 +248,7 @@ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`map_int_string_` MAP}, primaryKeys=id_, options=()} DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> two, 3 -> three}], op=INSERT, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[1, {1 -> one, 2 -> two, 3 -> three}], after=[], op=DELETE, meta=()} - do: Record Subscripting With Index projection: |- id_ @@ -282,7 +283,7 @@ primary-key: id_ expect: |- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>}, primaryKeys=id_, options=()} - DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice, length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=UPDATE, meta=()} + DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], op=INSERT, meta=()} DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, length: INT -> 7}], after=[], op=DELETE, meta=()} - do: Variant Object Subscripting With String Key projection: |- diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 8683a07b8b1..0a8b63703b9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -289,16 +289,19 @@ private Optional processDataChangeEvent( getProjectionProcessor(tableId, effectiveTransformer); TransformFilterProcessor filterProcessor = getFilterProcessor(tableId, effectiveTransformer); - RecordData beforeRow = null; - RecordData afterRow = null; - boolean filterPassed = true; + + BinaryRecordData beforeRow = null; + BinaryRecordData afterRow = null; + boolean beforeFilterPassed = false; + boolean afterFilterPassed = false; + if (event.before() != null) { context.opType = beforeOp; Tuple2 result = transformRecord( event.before(), info, projectionProcessor, filterProcessor, context); beforeRow = result.f0; - filterPassed = result.f1; + beforeFilterPassed = result.f1; } if (event.after() != null) { context.opType = afterOp; @@ -306,23 +309,51 @@ private Optional processDataChangeEvent( transformRecord( event.after(), info, projectionProcessor, filterProcessor, context); afterRow = result.f0; - filterPassed = result.f1; + afterFilterPassed = result.f1; } - if (filterPassed) { - DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); - if (effectiveTransformer.getPostTransformConverter().isPresent()) { - return effectiveTransformer - .getPostTransformConverter() - .get() - .convert(finalEvent) - .map(Event.class::cast); - } else { - return Optional.of(finalEvent); - } + // For UPDATE events, before and after filter results may differ, requiring op type + // conversion: + // before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE; + // before=N, after=Y -> INSERT; before=N, after=N -> drop. + DataChangeEvent finalEvent; + switch (event.op()) { + case INSERT: + case REPLACE: + if (!afterFilterPassed) { + return Optional.empty(); + } + finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); + break; + case DELETE: + if (!beforeFilterPassed) { + return Optional.empty(); + } + finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); + break; + case UPDATE: + if (beforeFilterPassed && afterFilterPassed) { + finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); + } else if (beforeFilterPassed) { + finalEvent = DataChangeEvent.deleteEvent(tableId, beforeRow, event.meta()); + } else if (afterFilterPassed) { + finalEvent = DataChangeEvent.insertEvent(tableId, afterRow, event.meta()); + } else { + return Optional.empty(); + } + break; + default: + throw new UnsupportedOperationException( + "Unsupported operation type: " + event.op()); } - // Events with no matching filters satisfied won't be emitted to downstream. - return Optional.empty(); + if (effectiveTransformer.getPostTransformConverter().isPresent()) { + return effectiveTransformer + .getPostTransformConverter() + .get() + .convert(finalEvent) + .map(Event.class::cast); + } + return Optional.of(finalEvent); } /** diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index ddd5aeb76ba..57563b28060 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -784,6 +784,204 @@ void testDataChangeEventTransformWithDuplicateColumns() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testUpdateEventFilterOpTypeConversion() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + COLUMN_SQUARE_TABLE.identifier(), + "col1, col2, col2 * col2 as square_col2", + "col2 < 3 OR col2 > 5") + .build(); + RegularEventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + transformFunctionEventEventOperatorTestHarness.open(); + CreateTableEvent createTableEvent = + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType())); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA))); + + // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE + DataChangeEvent updateBothPass = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {1, 1, null}), + recordDataGenerator.generate(new Object[] {1, 6, null})); + transform.processElement(new StreamRecord<>(updateBothPass)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {1, 1, 1}), + recordDataGenerator.generate(new Object[] {1, 6, 36})))); + + // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only) + DataChangeEvent updateBeforeOnly = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {2, 1, null}), + recordDataGenerator.generate(new Object[] {2, 4, null})); + transform.processElement(new StreamRecord<>(updateBeforeOnly)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.deleteEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {2, 1, 1})))); + + // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only) + DataChangeEvent updateAfterOnly = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {3, 4, null}), + recordDataGenerator.generate(new Object[] {3, 6, null})); + transform.processElement(new StreamRecord<>(updateAfterOnly)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {3, 6, 36})))); + + // Case 4: before=N(col2=4), after=N(col2=5) -> drop + DataChangeEvent updateNonePass = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {4, 4, null}), + recordDataGenerator.generate(new Object[] {4, 5, null})); + transform.processElement(new StreamRecord<>(updateNonePass)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isNull(); + + transformFunctionEventEventOperatorTestHarness.close(); + } + + @Test + void testUpdateEventFilterOpTypeConversionWithDataEventType() throws Exception { + // This test validates the behavior of __data_event_type__ metadata column + // when UPDATE events are converted to synthetic INSERT/DELETE events. + // Note: The projected __data_event_type__ reflects the original op type (-U/+U) + // rather than the converted op type (-D/+I). + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + COLUMN_SQUARE_TABLE.identifier(), + "col1, col2, col2 * col2 as square_col2, __data_event_type__ as event_type", + "col2 < 3 OR col2 > 5") + .build(); + RegularEventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + transformFunctionEventEventOperatorTestHarness.open(); + + Schema expectedSchema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.INT().notNull()) + .physicalColumn("col2", DataTypes.INT()) + .physicalColumn("square_col2", DataTypes.INT()) + .physicalColumn("event_type", DataTypes.STRING().notNull()) + .primaryKey("col1") + .build(); + CreateTableEvent createTableEvent = + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator(((RowType) expectedSchema.toRowDataType())); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(COLUMN_SQUARE_TABLE, expectedSchema))); + + // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE + // __data_event_type__ for before is -U, for after is +U + DataChangeEvent updateBothPass = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {1, 1, null}), + recordDataGenerator.generate(new Object[] {1, 6, null})); + transform.processElement(new StreamRecord<>(updateBothPass)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + expectedRecordDataGenerator.generate( + new Object[] {1, 1, 1, new BinaryStringData("-U")}), + expectedRecordDataGenerator.generate( + new Object[] { + 1, 6, 36, new BinaryStringData("+U") + })))); + + // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only) + // __data_event_type__ is -U (original before type, not -D) + DataChangeEvent updateBeforeOnly = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {2, 1, null}), + recordDataGenerator.generate(new Object[] {2, 4, null})); + transform.processElement(new StreamRecord<>(updateBeforeOnly)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.deleteEvent( + COLUMN_SQUARE_TABLE, + expectedRecordDataGenerator.generate( + new Object[] { + 2, 1, 1, new BinaryStringData("-U") + })))); + + // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only) + // __data_event_type__ is +U (original after type, not +I) + DataChangeEvent updateAfterOnly = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {3, 4, null}), + recordDataGenerator.generate(new Object[] {3, 6, null})); + transform.processElement(new StreamRecord<>(updateAfterOnly)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + expectedRecordDataGenerator.generate( + new Object[] { + 3, 6, 36, new BinaryStringData("+U") + })))); + + // Case 4: before=N(col2=4), after=N(col2=5) -> drop + DataChangeEvent updateNonePass = + DataChangeEvent.updateEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {4, 4, null}), + recordDataGenerator.generate(new Object[] {4, 5, null})); + transform.processElement(new StreamRecord<>(updateNonePass)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isNull(); + + transformFunctionEventEventOperatorTestHarness.close(); + } + @Test void testTimestampTransform() throws Exception { PostTransformOperator transform = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java index f55a0da8e41..b00b1a29ec7 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java @@ -487,7 +487,7 @@ void testReduceColumnsTransform() throws Exception { .updatePreTransformed( new Object[] {"id004", 18, "NeoReference004", 2018}, new Object[] {"id004", 10, "NeoReference004", 2018}) - .updatePostTransformed() + .deletePostTransformed("id004", "ID004", 19, "neoreference004") .deleteSource("id001", "Alice", 17, "Reference001", 2021) .deletePreTransformed("id001", 17, "Reference001", 2021) .deletePostTransformed("id001", "ID001", 18, "reference001") @@ -1032,7 +1032,7 @@ void testTransformWithCast() throws Exception { new Object[] {"1004", "Colin", 10, "NeoReference004", 2018}) .updatePreTransformed( new Object[] {"1004", 19, 2018}, new Object[] {"1004", 10, 2018}) - .updatePostTransformed() + .deletePostTransformed("1004", 20, 1023L, "19") .deleteSource("1001", "Alice", 17, "Reference001", 2021) .deletePreTransformed("1001", 17, 2021) .deletePostTransformed("1001", 18, 1018L, "17")