diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index 0d95f050b1c..50870edb5f2 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -63,7 +63,6 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.common.types.variant.Variant; -import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList; import org.apache.flink.shaded.guava31.com.google.common.collect.Streams; import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding; @@ -320,139 +319,6 @@ public static Object[] coerceRow( return coercedRow; } - /** - * Try to merge given {@link Schema}s and ensure they're identical. The only difference allowed - * is nullability, string and varchar precision, default value, and comments. - */ - public static Schema strictlyMergeSchemas(List schemas) { - Preconditions.checkArgument( - !schemas.isEmpty(), "Trying to merge transformed schemas %s, but got empty list"); - if (schemas.size() == 1) { - return schemas.get(0); - } - - List> primaryKeys = - schemas.stream() - .map(Schema::primaryKeys) - .filter(p -> !p.isEmpty()) - .distinct() - .collect(Collectors.toList()); - List> partitionKeys = - schemas.stream() - .map(Schema::partitionKeys) - .filter(p -> !p.isEmpty()) - .distinct() - .collect(Collectors.toList()); - List> options = - schemas.stream() - .map(Schema::options) - .filter(p -> !p.isEmpty()) - .distinct() - .collect(Collectors.toList()); - List> columnNames = - schemas.stream() - .map(Schema::getColumnNames) - .distinct() - .collect(Collectors.toList()); - - Preconditions.checkArgument( - primaryKeys.size() <= 1, - "Trying to merge transformed schemas %s, but got more than one primary key configurations: %s", - schemas, - primaryKeys); - Preconditions.checkArgument( - partitionKeys.size() <= 1, - "Trying to merge transformed schemas %s, but got more than one partition key configurations: %s", - schemas, - partitionKeys); - Preconditions.checkArgument( - options.size() <= 1, - "Trying to merge transformed schemas %s, but got more than one option configurations: %s", - schemas, - options); - Preconditions.checkArgument( - columnNames.size() == 1, - "Trying to merge transformed schemas %s, but got more than one column name views: %s", - schemas, - columnNames); - - int arity = columnNames.get(0).size(); - - ArrayListMultimap toBeMergedColumnTypes = - ArrayListMultimap.create(arity, 1); - for (Schema schema : schemas) { - List columnTypes = schema.getColumnDataTypes(); - for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) { - toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex)); - } - } - - List mergedColumnNames = columnNames.iterator().next(); - List mergedColumnTypes = new ArrayList<>(arity); - for (int i = 0; i < arity; i++) { - mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i))); - } - - List mergedColumns = new ArrayList<>(); - for (int i = 0; i < mergedColumnNames.size(); i++) { - mergedColumns.add( - Column.physicalColumn(mergedColumnNames.get(i), mergedColumnTypes.get(i))); - } - - return Schema.newBuilder() - .primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : primaryKeys.get(0)) - .partitionKey( - partitionKeys.isEmpty() ? Collections.emptyList() : partitionKeys.get(0)) - .options(options.isEmpty() ? Collections.emptyMap() : options.get(0)) - .setColumns(mergedColumns) - .build(); - } - - private static DataType strictlyMergeDataTypes(List dataTypes) { - Preconditions.checkArgument( - !dataTypes.isEmpty(), - "Trying to merge transformed data types %s, but got empty list"); - - List simpleMergeTypes = - dataTypes.stream().distinct().collect(Collectors.toList()); - if (simpleMergeTypes.size() == 1) { - return simpleMergeTypes.get(0); - } - - List typeRoots = - dataTypes.stream() - .map(DataType::getTypeRoot) - .distinct() - .collect(Collectors.toList()); - Preconditions.checkArgument( - typeRoots.size() == 1, - "Trying to merge types %s, but got more than one type root: %s", - dataTypes, - typeRoots); - - // Decay types to the most - DataType type = dataTypes.get(0); - - if (type.is(DataTypeRoot.CHAR)) { - return DataTypes.CHAR(CharType.MAX_LENGTH); - } else if (type.is(DataTypeRoot.VARCHAR)) { - return DataTypes.STRING(); - } else if (type.is(DataTypeRoot.BINARY)) { - return DataTypes.BINARY(BinaryType.MAX_LENGTH); - } else if (type.is(DataTypeRoot.VARBINARY)) { - return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH); - } else if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { - return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION); - } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) { - return DataTypes.TIMESTAMP_TZ(ZonedTimestampType.MAX_PRECISION); - } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { - return DataTypes.TIMESTAMP_LTZ(LocalZonedTimestampType.MAX_PRECISION); - } else { - throw new IllegalArgumentException( - "Unable to merge data types with different precision: " + dataTypes); - } - } - @VisibleForTesting static boolean isDataTypeCompatible(@Nullable DataType currentType, DataType upcomingType) { // If two types are identical, they're compatible of course. diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java index 0074039c927..5e521c3df4d 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java @@ -613,73 +613,6 @@ void testOpTypeMetadataColumnInBatchMode(ValuesDataSink.SinkApi sinkApi) throws "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}"); } - @ParameterizedTest - @EnumSource - void testTransformTwiceInBatchMode(ValuesDataSink.SinkApi sinkApi) throws Exception { - FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); - - // Setup value source - Configuration sourceConfig = new Configuration(); - sourceConfig.set( - ValuesDataSourceOptions.EVENT_SET_ID, - ValuesDataSourceHelper.EventSetId.TRANSFORM_BATCH_TABLE); - SourceDef sourceDef = - new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); - - // Setup value sink - Configuration sinkConfig = new Configuration(); - sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); - sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); - SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); - - // Setup transform - TransformDef transformDef1 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'1') as col12", - "col1 = '1' OR col1 = '999'", - "col1", - "col12", - "key1=value1", - "", - null); - TransformDef transformDef2 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'2') as col12", - "col1 = '2'", - null, - null, - null, - "", - null); - // Setup pipeline - Configuration pipelineConfig = new Configuration(); - pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - pipelineConfig.set( - PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE, RuntimeExecutionMode.BATCH); - PipelineDef pipelineDef = - new PipelineDef( - sourceDef, - sinkDef, - Collections.emptyList(), - new ArrayList<>(Arrays.asList(transformDef1, transformDef2)), - Collections.emptyList(), - pipelineConfig); - - // Execute the pipeline - PipelineExecution execution = composer.compose(pipelineDef); - execution.execute(); - - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split("\n"); - assertThat(outputEvents) - .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}"); - } - @ParameterizedTest @EnumSource void testOneToOneRoutingInBatchMode(ValuesDataSink.SinkApi sinkApi) throws Exception { diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index fd2368b6ce0..d01d987d492 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -423,78 +423,6 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}"); } - @ParameterizedTest - @EnumSource - void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { - FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); - - // Setup value source - Configuration sourceConfig = new Configuration(); - sourceConfig.set( - ValuesDataSourceOptions.EVENT_SET_ID, - ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); - SourceDef sourceDef = - new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); - - // Setup value sink - Configuration sinkConfig = new Configuration(); - sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); - sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); - SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); - - // Setup transform - TransformDef transformDef1 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'1') as col12", - "col1 = '1' OR col1 = '999'", - "col1", - "col12", - "key1=value1", - "", - null); - TransformDef transformDef2 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'2') as col12", - "col1 = '2'", - null, - null, - null, - "", - null); - // Setup pipeline - Configuration pipelineConfig = new Configuration(); - pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - pipelineConfig.set( - PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); - PipelineDef pipelineDef = - new PipelineDef( - sourceDef, - sinkDef, - Collections.emptyList(), - new ArrayList<>(Arrays.asList(transformDef1, transformDef2)), - Collections.emptyList(), - pipelineConfig); - - // Execute the pipeline - PipelineExecution execution = composer.compose(pipelineDef); - execution.execute(); - - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split("\n"); - assertThat(outputEvents) - .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", - "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", - "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", - "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=({op_ts=4})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}"); - } - @ParameterizedTest @EnumSource void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception { diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index ede3c4b1721..7348749b7e1 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -465,76 +465,6 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, meta=({op_ts=5})}"); } - @ParameterizedTest - @EnumSource - void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { - FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); - - // Setup value source - Configuration sourceConfig = new Configuration(); - sourceConfig.set( - ValuesDataSourceOptions.EVENT_SET_ID, - ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); - SourceDef sourceDef = - new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); - - // Setup value sink - Configuration sinkConfig = new Configuration(); - sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); - sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); - SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); - - // Setup transform - TransformDef transformDef1 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'1') as col12", - "col1 = '1' OR col1 = '999'", - "col1", - "col12", - "key1=value1", - "", - null); - TransformDef transformDef2 = - new TransformDef( - "default_namespace.default_schema.table1", - "*,concat(col1,'2') as col12", - "col1 = '2'", - null, - null, - null, - "", - null); - // Setup pipeline - Configuration pipelineConfig = new Configuration(); - pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - - PipelineDef pipelineDef = - new PipelineDef( - sourceDef, - sinkDef, - Collections.emptyList(), - new ArrayList<>(Arrays.asList(transformDef1, transformDef2)), - Collections.emptyList(), - pipelineConfig); - - // Execute the pipeline - PipelineExecution execution = composer.compose(pipelineDef); - execution.execute(); - - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR); - assertThat(outputEvents) - .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", - "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", - "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 11, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, meta=({op_ts=5})}"); - } - @Test void testOneToOneRouting() throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index e4da2fca351..2842729cdd0 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -253,20 +253,11 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except Arrays.asList( new TransformDef( "default_namespace.default_schema.\\.*", - "*, 'YOUNG' AS category", - "age < 20", + "*, CASE WHEN age < 20 THEN 'YOUNG' WHEN age >= 20 THEN 'OLD' END AS category", null, null, null, null, - null), - new TransformDef( - "default_namespace.default_schema.\\.*", - "*, 'OLD' AS category", - "age >= 20", - null, - null, - null, null, null)), Arrays.asList( @@ -288,17 +279,8 @@ void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) thro Arrays.asList( new TransformDef( "default_namespace.default_schema.\\.*", - "id,age,'Juvenile' AS roleName", - "age < 18", - null, - null, - null, + "id,age, CASE WHEN age < 18 THEN 'Juvenile' WHEN age >= 18 THEN name END AS roleName", null, - null), - new TransformDef( - "default_namespace.default_schema.\\.*", - "id,age,name AS roleName", - "age >= 18", null, null, null, @@ -317,42 +299,7 @@ void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) thro @ParameterizedTest @EnumSource - void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Exception { - runGenericTransformTest( - sinkApi, - Arrays.asList( - new TransformDef( - "default_namespace.default_schema.mytable2", - "*,'Juvenile' AS roleName", - "age < 18", - null, - null, - null, - null, - null), - new TransformDef( - "default_namespace.default_schema.mytable2", - "id,name,age,description,name AS roleName", - "age >= 18", - null, - null, - null, - null, - null)), - Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}")); - } - - @ParameterizedTest - @EnumSource - void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws Exception { + void testMissingProjection(ValuesDataSink.SinkApi sinkApi) throws Exception { runGenericTransformTest( sinkApi, Arrays.asList( @@ -364,170 +311,14 @@ void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws null, null, null, - null), - new TransformDef( - "default_namespace.default_schema.mytable2", - "id,UPPER(name) AS name,age,description", - "age >= 18", - null, - null, - null, - null, - null)), - Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); - } - - @ParameterizedTest - @EnumSource - @Disabled("to be fixed in FLINK-37132") - void testMultiTransformSchemaColumnsCompatibilityWithNullProjection( - ValuesDataSink.SinkApi sinkApi) { - TransformDef nullProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - null, - "age < 18", - null, - null, - null, - null, - null); - - assertThatThrownBy( - () -> - runGenericTransformTest( - sinkApi, - Arrays.asList( - nullProjection, - new TransformDef( - "default_namespace.default_schema.mytable2", - // reference part column - "id,UPPER(name) AS name", - "age >= 18", - null, - null, - null, - null, - null)), - Collections.emptyList())) - .rootCause() - .isExactlyInstanceOf(IllegalStateException.class) - .hasMessage( - "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " - + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); - } - - @ParameterizedTest - @EnumSource - @Disabled("to be fixed in FLINK-37132") - void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection( - ValuesDataSink.SinkApi sinkApi) { - TransformDef emptyProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - "", - "age < 18", - null, - null, - null, - null, - null); - - assertThatThrownBy( - () -> - runGenericTransformTest( - sinkApi, - Arrays.asList( - emptyProjection, - new TransformDef( - "default_namespace.default_schema.mytable2", - // reference part column - "id,UPPER(name) AS name", - "age >= 18", - null, - null, - null, - null, - null)), - Collections.emptyList())) - .rootCause() - .isExactlyInstanceOf(IllegalStateException.class) - .hasMessage( - "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " - + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); - } - - @ParameterizedTest - @EnumSource - void testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi sinkApi) - throws Exception { - TransformDef nullProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - null, - "age < 18", - null, - null, - null, - null, - null); - - TransformDef emptyProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - "", - "age < 18", - null, - null, - null, - null, - null); - - TransformDef asteriskProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - "*", - "age < 18", - null, - null, - null, - null, - null); - - runGenericTransformTest( - sinkApi, - Arrays.asList( - // Setting projection as null, '', or * should be equivalent - nullProjection, - emptyProjection, - asteriskProjection, - new TransformDef( - "default_namespace.default_schema.mytable2", - // reference all column - "id,UPPER(name) AS name,age,description", - "age >= 18", - null, - null, - null, - null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}")); } /** This tests if transform generates metadata info correctly. */ @@ -1237,47 +1028,49 @@ void testTransformWithVariantFunctions() throws Exception { @ParameterizedTest @EnumSource - void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi apiVersion) { - Assertions.assertThatThrownBy( - () -> - runGenericTransformTest( - apiVersion, - Arrays.asList( - new TransformDef( - "\\.*.\\.*.mytable1", - "*, 'rule_1_matched' AS rule_1_matched", - "id > 0", - null, - "id", - null, - null, - null), - new TransformDef( - "\\.*.\\.*.\\.*", - "*, 'rule_fallback' AS rule_fallback", - null, - null, - "id", - null, - null, - null)), - Collections.emptyList())) - .rootCause() - .isExactlyInstanceOf(IllegalArgumentException.class) - .hasMessage( - "Trying to merge transformed schemas [columns={`id` INT,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, partitionKeys=id, options=(), columns={`id` INT,`name` STRING,`age` INT,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()], but got more than one column name views: [[id, name, age, rule_1_matched], [id, name, age, rule_fallback]]"); + void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Exception { + runGenericTransformTest( + apiVersion, + Arrays.asList( + new TransformDef( + "\\.*.\\.*.mytable1", + "*, 'rule_1_matched' AS rule_1_matched", + null, + null, + "id", + null, + null, + null), + new TransformDef( + "\\.*.\\.*.\\.*", + "*, 'rule_fallback' AS rule_fallback", + null, + null, + "id", + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, rule_fallback], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, rule_fallback], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}")); } @ParameterizedTest @EnumSource - void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Exception { + void testTransformFilterWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Exception { runGenericTransformTest( apiVersion, Arrays.asList( new TransformDef( "\\.*.\\.*.mytable1", "*, 'rule_1_matched' AS rule_1_matched", - null, + "id > 1", null, "id", null, @@ -1294,7 +1087,6 @@ void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Ex null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, partitionKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, meta=()}", "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 5410eb9039a..766621971e3 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -270,11 +270,7 @@ private void testGenericSchemaEvolution( + (triggerError ? " error.on.schema.change: true\n" : "\n") + "transform:\n" + " - source-table: %s.\\.*\n" - + " projection: CAST(id AS VARCHAR) || ' -> ' || name AS uid, *, id * id AS id_square, 'age < 20' as tag\n" - + " filter: age < 20\n" - + " - source-table: %s.\\.*\n" - + " projection: CAST(id AS VARCHAR) || ' -> ' || name AS uid, *, 0 - id * id AS id_square, 'age >= 20' as tag\n" - + " filter: age >= 20\n" + + " projection: CAST(id AS VARCHAR) || ' -> ' || name AS uid, *, CASE WHEN age < 20 THEN id * id WHEN age >= 20 THEN 0 - id * id END AS id_square, CASE WHEN age < 20 THEN 'age < 20' WHEN age >= 20 THEN 'age >= 20' END as tag\n" + (mergeTable ? String.format( "route:\n" @@ -292,7 +288,6 @@ private void testGenericSchemaEvolution( dbName, mergeTable ? "(members|new_members)" : "members", dbName, - dbName, behavior, parallelism); submitPipelineJob(pipelineJob); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 925bc6059c1..6b2e7b6ddd6 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -188,11 +188,7 @@ void testMultipleTransformRule(boolean batchMode) throws Exception { + "\n" + "transform:\n" + " - source-table: %s.\\.*\n" - + " projection: ID, VERSION, 'Type-A' AS CATEGORY\n" - + " filter: ID > 1008\n" - + " - source-table: %s.\\.*\n" - + " projection: ID, VERSION, 'Type-B' AS CATEGORY\n" - + " filter: ID <= 1008\n" + + " projection: ID, VERSION, CASE WHEN ID > 1008 THEN 'Type-A' WHEN ID <= 1008 THEN 'Type-B' END AS CATEGORY\n" + "\n" + "pipeline:\n" + " execution.runtime-mode: %s\n" @@ -203,7 +199,6 @@ void testMultipleTransformRule(boolean batchMode) throws Exception { startupMode, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName(), runtimeMode, parallelism); submitPipelineJob(pipelineJob); @@ -612,11 +607,7 @@ void testMultipleTransformWithDiffRefColumn(boolean batchMode) throws Exception + " type: values\n" + "transform:\n" + " - source-table: %s.TABLEALPHA\n" - + " projection: ID, VERSION, PRICEALPHA, AGEALPHA, 'Juvenile' AS ROLENAME\n" - + " filter: AGEALPHA < 18\n" - + " - source-table: %s.TABLEALPHA\n" - + " projection: ID, VERSION, PRICEALPHA, AGEALPHA, NAMEALPHA AS ROLENAME\n" - + " filter: AGEALPHA >= 18\n" + + " projection: ID, VERSION, PRICEALPHA, AGEALPHA, CASE WHEN AGEALPHA < 18 THEN 'Juvenile' WHEN AGEALPHA >= 18 THEN NAMEALPHA END AS ROLENAME\n" + "pipeline:\n" + " execution.runtime-mode: %s\n" + " parallelism: %d", @@ -626,7 +617,6 @@ void testMultipleTransformWithDiffRefColumn(boolean batchMode) throws Exception startupMode, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName(), runtimeMode, parallelism); submitPipelineJob(pipelineJob); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 36e348e09e9..8683a07b8b1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -32,7 +32,6 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; -import org.apache.flink.cdc.common.utils.SchemaMergingUtils; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter; import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters; @@ -43,6 +42,9 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable; import org.apache.flink.shaded.guava31.com.google.common.collect.Table; @@ -56,7 +58,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; @@ -88,6 +89,8 @@ public class PostTransformOperator extends AbstractStreamOperatorAdapter projectionProcessors; private transient Table filterProcessors; + private transient LoadingCache> transformersCache; + public static PostTransformOperatorBuilder newBuilder() { return new PostTransformOperatorBuilder(); } @@ -116,6 +119,16 @@ public void open() throws Exception { initializeUdf(); this.transformers = createTransformers(); + this.transformersCache = + CacheBuilder.newBuilder() + .maximumSize(1024) + .build( + new CacheLoader<>() { + @Override + public Optional load(TableId tableId) { + return getEffectiveTransformer(tableId); + } + }); } @Override @@ -162,26 +175,26 @@ private void processElementInternal(StreamRecord element) { ChangeEvent changeEvent = (ChangeEvent) event; TableId tableId = changeEvent.tableId(); - List transformers = getEffectiveTransformers(tableId); + Optional transformer = transformersCache.getUnchecked(tableId); // Short-circuit if there's no effective transformers. - if (transformers.isEmpty()) { + if (transformer.isEmpty()) { output.collect(element); return; } if (event instanceof CreateTableEvent) { - processCreateTableEvent((CreateTableEvent) event, transformers) + processCreateTableEvent((CreateTableEvent) event, transformer.get()) .map(StreamRecord::new) .ifPresent(output::collect); invalidateCache(tableId); } else if (event instanceof SchemaChangeEvent) { - processSchemaChangeEvent((SchemaChangeEvent) event, transformers) + processSchemaChangeEvent((SchemaChangeEvent) event, transformer.get()) .map(StreamRecord::new) .ifPresent(output::collect); invalidateCache(tableId); } else if (event instanceof DataChangeEvent) { - processDataChangeEvent((DataChangeEvent) event, transformers) + processDataChangeEvent((DataChangeEvent) event, transformer.get()) .map(StreamRecord::new) .ifPresent(output::collect); } else { @@ -197,18 +210,12 @@ private void processElementInternal(StreamRecord element) { * Apply effective transform rules to {@link CreateTableEvent}s based on effective transformers. */ private Optional processCreateTableEvent( - CreateTableEvent event, List effectiveTransformers) { + CreateTableEvent event, PostTransformer effectiveTransformer) { TableId tableId = event.tableId(); Schema preSchema = event.getSchema(); - // Apply transform rules and verify we can get a deterministic post schema - List schemas = - effectiveTransformers.stream() - .map(trans -> transformSchema(preSchema, trans)) - .collect(Collectors.toList()); - Schema postSchema = - SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas)); + SchemaUtils.ensurePkNonNull(transformSchema(preSchema, effectiveTransformer)); // Update transform info map postTransformInfoMap.put( @@ -216,11 +223,10 @@ private Optional processCreateTableEvent( // Update "if-table-has-been–wildcard–matched" map boolean wildcardMatched = - effectiveTransformers.stream() - .map(PostTransformer::getProjection) - .flatMap(this::optionalToStream) - .map(TransformProjection::getProjection) - .anyMatch(TransformParser::hasAsterisk); + effectiveTransformer.getProjection().isPresent() + && TransformParser.hasAsterisk( + effectiveTransformer.getProjection().get().getProjection()); + hasAsteriskMap.put(tableId, wildcardMatched); projectedColumnsMap.put( tableId, @@ -236,7 +242,7 @@ private Optional processCreateTableEvent( * transformers and existing {@link PostTransformChangeInfo}. */ private Optional processSchemaChangeEvent( - SchemaChangeEvent event, List effectiveTransformers) { + SchemaChangeEvent event, PostTransformer effectiveTransformer) { TableId tableId = event.tableId(); PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId)); @@ -244,14 +250,8 @@ private Optional processSchemaChangeEvent( Schema prevPreSchema = info.getPreTransformedSchema(); Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent(prevPreSchema, event); - // Apply transform rules and verify we can get a deterministic post schema - List schemas = - effectiveTransformers.stream() - .map(trans -> transformSchema(nextPreSchema, trans)) - .collect(Collectors.toList()); - Schema nextPostSchema = - SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas)); + SchemaUtils.ensurePkNonNull(transformSchema(nextPreSchema, effectiveTransformer)); // Update transform info map postTransformInfoMap.put( @@ -274,7 +274,7 @@ private Optional processSchemaChangeEvent( /** Apply projection rules to given {@link DataChangeEvent}. */ private Optional processDataChangeEvent( - DataChangeEvent event, List effectiveTransformers) { + DataChangeEvent event, PostTransformer effectiveTransformer) { TableId tableId = event.tableId(); PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId)); @@ -285,50 +285,39 @@ private Optional processDataChangeEvent( String beforeOp = event.opTypeString(false); String afterOp = event.opTypeString(true); - - for (PostTransformer transformer : effectiveTransformers) { - TransformProjectionProcessor projectionProcessor = - getProjectionProcessor(tableId, transformer); - TransformFilterProcessor filterProcessor = getFilterProcessor(tableId, transformer); - - RecordData beforeRow = null; - RecordData afterRow = null; - boolean filterPassed = true; - - if (event.before() != null) { - context.opType = beforeOp; - Tuple2 result = - transformRecord( - event.before(), - info, - projectionProcessor, - filterProcessor, - context); - beforeRow = result.f0; - filterPassed = result.f1; - } - - if (event.after() != null) { - context.opType = afterOp; - Tuple2 result = - transformRecord( - event.after(), info, projectionProcessor, filterProcessor, context); - afterRow = result.f0; - filterPassed = result.f1; - } - - if (filterPassed) { - DataChangeEvent finalEvent = - DataChangeEvent.projectRecords(event, beforeRow, afterRow); - if (transformer.getPostTransformConverter().isPresent()) { - return transformer - .getPostTransformConverter() - .get() - .convert(finalEvent) - .map(Event.class::cast); - } else { - return Optional.of(finalEvent); - } + TransformProjectionProcessor projectionProcessor = + getProjectionProcessor(tableId, effectiveTransformer); + TransformFilterProcessor filterProcessor = + getFilterProcessor(tableId, effectiveTransformer); + RecordData beforeRow = null; + RecordData afterRow = null; + boolean filterPassed = true; + if (event.before() != null) { + context.opType = beforeOp; + Tuple2 result = + transformRecord( + event.before(), info, projectionProcessor, filterProcessor, context); + beforeRow = result.f0; + filterPassed = result.f1; + } + if (event.after() != null) { + context.opType = afterOp; + Tuple2 result = + transformRecord( + event.after(), info, projectionProcessor, filterProcessor, context); + afterRow = result.f0; + filterPassed = result.f1; + } + if (filterPassed) { + DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, beforeRow, afterRow); + if (effectiveTransformer.getPostTransformConverter().isPresent()) { + return effectiveTransformer + .getPostTransformConverter() + .get() + .convert(finalEvent) + .map(Event.class::cast); + } else { + return Optional.of(finalEvent); } } @@ -397,22 +386,14 @@ private Tuple2 transformRecord( // Convenience methods for coping with transient fields. // ------------------- - /** Obtain effective transformers based on given {@link TableId}. */ - private List getEffectiveTransformers(TableId tableId) { - List effectiveTransformers = new ArrayList<>(); + /** Obtain effective transformer based on given {@link TableId}. */ + private Optional getEffectiveTransformer(TableId tableId) { for (PostTransformer transformer : transformers) { if (transformer.getSelectors().isMatch(tableId)) { - effectiveTransformers.add(transformer); - - // Transform module works with "First-match" rule. If we have met an uncondition - // transform rule (without any filtering expression), then any following transform - // rule will not be effective. - if (!transformer.getFilter().isPresent()) { - break; - } + return Optional.of(transformer); } } - return effectiveTransformers; + return Optional.empty(); } /** @@ -546,10 +527,4 @@ private void destroyUdf() { udfDescriptors.clear(); udfFunctionInstances.clear(); } - - /** Backport of {@code Optional#stream} before Java 11. */ - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private Stream optionalToStream(Optional optional) { - return optional.map(Stream::of).orElseGet(Stream::empty); - } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 22e0cfbd342..ddd5aeb76ba 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -468,112 +468,6 @@ void testDataChangeEventTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } - @Test - void testDataChangeEventTransformTwice() throws Exception { - PostTransformOperator transform = - PostTransformOperator.newBuilder() - .addTransform( - CUSTOMERS_TABLEID.identifier(), - "*, concat(col1, '1') col12", - "col1 = '1'") - .addTransform( - CUSTOMERS_TABLEID.identifier(), - "*, concat(col1, '2') col12", - "col1 = '2'") - .build(); - RegularEventOperatorTestHarness - transformFunctionEventEventOperatorTestHarness = - RegularEventOperatorTestHarness.with(transform, 1); - // Initialization - transformFunctionEventEventOperatorTestHarness.open(); - // Create table - CreateTableEvent createTableEvent = - new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); - BinaryRecordDataGenerator recordDataGenerator = - new BinaryRecordDataGenerator(((RowType) CUSTOMERS_SCHEMA.toRowDataType())); - // Insert - DataChangeEvent insertEvent = - DataChangeEvent.insertEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), new BinaryStringData("2"), null - })); - DataChangeEvent insertEventExpect = - DataChangeEvent.insertEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), - new BinaryStringData("2"), - new BinaryStringData("11") - })); - // Insert - DataChangeEvent insertEvent2 = - DataChangeEvent.insertEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("2"), new BinaryStringData("2"), null - })); - DataChangeEvent insertEvent2Expect = - DataChangeEvent.insertEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("2"), - new BinaryStringData("2"), - new BinaryStringData("22") - })); - // Update - DataChangeEvent updateEvent = - DataChangeEvent.updateEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), new BinaryStringData("2"), null - }), - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), new BinaryStringData("3"), null - })); - DataChangeEvent updateEventExpect = - DataChangeEvent.updateEvent( - CUSTOMERS_TABLEID, - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), - new BinaryStringData("2"), - new BinaryStringData("11") - }), - recordDataGenerator.generate( - new Object[] { - new BinaryStringData("1"), - new BinaryStringData("3"), - new BinaryStringData("11") - })); - - transform.processElement(new StreamRecord<>(createTableEvent)); - Assertions.assertThat( - transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) - .isEqualTo( - new StreamRecord<>( - new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA))); - transform.processElement(new StreamRecord<>(insertEvent)); - Assertions.assertThat( - transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) - .isEqualTo(new StreamRecord<>(insertEventExpect)); - transform.processElement(new StreamRecord<>(insertEvent2)); - Assertions.assertThat( - transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) - .isEqualTo(new StreamRecord<>(insertEvent2Expect)); - transform.processElement(new StreamRecord<>(updateEvent)); - Assertions.assertThat( - transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) - .isEqualTo(new StreamRecord<>(updateEventExpect)); - transformFunctionEventEventOperatorTestHarness.close(); - } - @Test void testDataChangeEventTransformProjectionDataTypeConvert() throws Exception { PostTransformOperator transform = @@ -1259,21 +1153,12 @@ void testTimestampdiffTransform() throws Exception { .addTransform( TIMESTAMPDIFF_TABLEID.identifier(), "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff," - + " TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff," - + " TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff," - + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff," - + " TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as month_diff," - + " TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as year_diff", - "col1='1'") - .addTransform( - TIMESTAMPDIFF_TABLEID.identifier(), - "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as second_diff," - + " TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff," - + " TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff," - + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as day_diff," - + " TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as month_diff," - + " TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as year_diff", - "col1='2'") + + " CASE WHEN col1='1' THEN TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN col1='2' THEN TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END as minute_diff," + + " CASE WHEN col1='1' THEN TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN col1='2' THEN TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END as hour_diff," + + " CASE WHEN col1='1' THEN TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN col1='2' THEN TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END as day_diff," + + " CASE WHEN col1='1' THEN TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN col1='2' THEN TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END as month_diff," + + " CASE WHEN col1='1' THEN TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN col1='2' THEN TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END as year_diff", + null) .addTimezone("Asia/Shanghai") .build(); RegularEventOperatorTestHarness @@ -1336,51 +1221,11 @@ void testTimestampdiffTransformData() throws Exception { .addTransform( TIMESTAMPDIFF_DATA_TABLEID.identifier(), "col1, time_interval_unit," - + " TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='SECOND'") - .addTransform( - TIMESTAMPDIFF_DATA_TABLEID.identifier(), - "col1, time_interval_unit," - + " TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='MINUTE'") - .addTransform( - TIMESTAMPDIFF_DATA_TABLEID.identifier(), - "col1, time_interval_unit," - + " TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='HOUR'") - .addTransform( - TIMESTAMPDIFF_DATA_TABLEID.identifier(), - "col1, time_interval_unit," - + " TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='DAY'") - .addTransform( - TIMESTAMPDIFF_DATA_TABLEID.identifier(), - "col1, time_interval_unit," - + " TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='MONTH'") - .addTransform( - TIMESTAMPDIFF_DATA_TABLEID.identifier(), - "col1, time_interval_unit," - + " TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp) as timestamp_timestamp," - + " TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz," - + " TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp," - + " TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz", - "time_interval_unit='YEAR'") + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp) WHEN time_interval_unit='HOUR' THEN TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp) WHEN time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp) WHEN time_interval_unit='YEAR' THEN TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp) END as timestamp_timestamp," + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='YEAR' THEN TIMESTAMPDIFF(YEAR, start_timestamp, end_timestamp_ltz) END as timestamp_timestamp_ltz," + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='HOUR' THEN TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='YEAR' THEN TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp) END as timestamp_ltz_timestamp," + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='YEAR' THEN TIMESTAMPDIFF(YEAR, start_timestamp_ltz, end_timestamp_ltz) END as timestamp_ltz_timestamp_ltz", + null) .addTimezone("UTC") .build(); RegularEventOperatorTestHarness @@ -1676,22 +1521,14 @@ void testTimestampAddTransform() throws Exception { PostTransformOperator.newBuilder() .addTransform( TIMESTAMPADD_TABLEID.identifier(), - "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add," - + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add," - + " DATE_FORMAT(TIMESTAMPADD(HOUR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add," - + " DATE_FORMAT(TIMESTAMPADD(DAY, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add," - + " DATE_FORMAT(TIMESTAMPADD(MONTH, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add," - + " DATE_FORMAT(TIMESTAMPADD(YEAR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add", - "col1='1'") - .addTransform( - TIMESTAMPADD_TABLEID.identifier(), - "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add," - + " DATE_FORMAT(TIMESTAMPADD(MINUTE, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add," - + " DATE_FORMAT(TIMESTAMPADD(HOUR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add," - + " DATE_FORMAT(TIMESTAMPADD(DAY, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add," - + " DATE_FORMAT(TIMESTAMPADD(MONTH, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add," - + " DATE_FORMAT(TIMESTAMPADD(YEAR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add", - "col1='2'") + "col1, " + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(SECOND, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(SECOND, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as second_add," + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(MINUTE, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MINUTE, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as minute_add," + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(HOUR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(HOUR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as hour_add," + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(DAY, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(DAY, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as day_add," + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(MONTH, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MONTH, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as month_add," + + "CASE WHEN col1='1' THEN DATE_FORMAT(TIMESTAMPADD(YEAR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(YEAR, -1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as year_add", + null) .addTimezone("UTC") .build(); RegularEventOperatorTestHarness @@ -1771,39 +1608,9 @@ void testTimestampaddTransformData() throws Exception { .addTransform( TIMESTAMPADD_DATA_TABLEID.identifier(), "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(SECOND, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(SECOND, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='SECOND'") - .addTransform( - TIMESTAMPADD_DATA_TABLEID.identifier(), - "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='MINUTE'") - .addTransform( - TIMESTAMPADD_DATA_TABLEID.identifier(), - "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(HOUR, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(HOUR, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='HOUR'") - .addTransform( - TIMESTAMPADD_DATA_TABLEID.identifier(), - "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(DAY, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(DAY, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='DAY'") - .addTransform( - TIMESTAMPADD_DATA_TABLEID.identifier(), - "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(MONTH, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(MONTH, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='MONTH'") - .addTransform( - TIMESTAMPADD_DATA_TABLEID.identifier(), - "col1, time_interval_unit, interval_value," - + " TIMESTAMPADD(YEAR, interval_value, time_point_timestamp) as time_point_timestamp," - + " TIMESTAMPADD(YEAR, interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz", - "time_interval_unit='YEAR'") + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value, time_point_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp) WHEN time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value, time_point_timestamp) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY, interval_value, time_point_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPADD(MONTH, interval_value, time_point_timestamp) WHEN time_interval_unit='YEAR' THEN TIMESTAMPADD(YEAR, interval_value, time_point_timestamp) END as time_point_timestamp," + + " CASE WHEN time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY, interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPADD(MONTH, interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='YEAR' THEN TIMESTAMPADD(YEAR, interval_value, time_point_timestamp_ltz) END as time_point_timestamp_ltz", + null) .addTimezone("UTC") .build(); RegularEventOperatorTestHarness @@ -2135,153 +1942,18 @@ void testCastTransform() throws Exception { .addTransform( CAST_TABLEID.identifier(), "col1" - + ",cast(col1 as int) as castInt" - + ",cast(col1 as boolean) as castBoolean" - + ",cast(col1 as tinyint) as castTinyint" - + ",cast(col1 as smallint) as castSmallint" - + ",cast(col1 as bigint) as castBigint" - + ",cast(col1 as float) as castFloat" - + ",cast(col1 as double) as castDouble" - + ",cast(col1 as char) as castChar" - + ",cast(col1 as varchar) as castVarchar" - + ",cast(col1 as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '1'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",cast(castInt as int) as castInt" - + ",cast(castInt as boolean) as castBoolean" - + ",cast(castInt as tinyint) as castTinyint" - + ",cast(castInt as smallint) as castSmallint" - + ",cast(castInt as bigint) as castBigint" - + ",cast(castInt as float) as castFloat" - + ",cast(castInt as double) as castDouble" - + ",cast(castInt as char) as castChar" - + ",cast(castInt as varchar) as castVarchar" - + ",cast(castInt as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '2'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",cast(castBoolean as int) as castInt" - + ",cast(castBoolean as boolean) as castBoolean" - + ",cast(castBoolean as tinyint) as castTinyint" - + ",cast(castBoolean as smallint) as castSmallint" - + ",cast(castBoolean as bigint) as castBigint" - + ",castFloat" - + ",castDouble" - + ",cast(castBoolean as char) as castChar" - + ",cast(castBoolean as varchar) as castVarchar" - + ",castDecimal" - + ", castTimestamp", - "col1 = '3'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",cast(castTinyint as int) as castInt" - + ",cast(castTinyint as boolean) as castBoolean" - + ",cast(castTinyint as tinyint) as castTinyint" - + ",cast(castTinyint as smallint) as castSmallint" - + ",cast(castTinyint as bigint) as castBigint" - + ",cast(castTinyint as float) as castFloat" - + ",cast(castTinyint as double) as castDouble" - + ",cast(castTinyint as char) as castChar" - + ",cast(castTinyint as varchar) as castVarchar" - + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '4'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",cast(castSmallint as int) as castInt" - + ",cast(castSmallint as boolean) as castBoolean" - + ",cast(castSmallint as tinyint) as castTinyint" - + ",cast(castSmallint as smallint) as castSmallint" - + ",cast(castSmallint as bigint) as castBigint" - + ",cast(castSmallint as float) as castFloat" - + ",cast(castSmallint as double) as castDouble" - + ",cast(castSmallint as char) as castChar" - + ",cast(castSmallint as varchar) as castVarchar" - + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '5'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",cast(castBigint as int) as castInt" - + ",cast(castBigint as boolean) as castBoolean" - + ",cast(castBigint as tinyint) as castTinyint" - + ",cast(castBigint as smallint) as castSmallint" - + ",cast(castBigint as bigint) as castBigint" - + ",cast(castBigint as float) as castFloat" - + ",cast(castBigint as double) as castDouble" - + ",cast(castBigint as char) as castChar" - + ",cast(castBigint as varchar) as castVarchar" - + ",cast(castBigint as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '6'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",castInt" - + ",cast(castFloat as boolean) as castBoolean" - + ",castTinyint" - + ",castSmallint" - + ",castBigint" - + ",cast(castFloat as float) as castFloat" - + ",cast(castFloat as double) as castDouble" - + ",cast(castFloat as char) as castChar" - + ",cast(castFloat as varchar) as castVarchar" - + ",cast(castFloat as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '7'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",castInt" - + ",cast(castDouble as boolean) as castBoolean" - + ",castTinyint" - + ",castSmallint" - + ",castBigint" - + ",cast(castDouble as float) as castFloat" - + ",cast(castDouble as double) as castDouble" - + ",cast(castDouble as char) as castChar" - + ",cast(castDouble as varchar) as castVarchar" - + ",cast(castDouble as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '8'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",castInt" - + ",cast(castDecimal as boolean) as castBoolean" - + ",castTinyint" - + ",castSmallint" - + ",castBigint" - + ",cast(castDecimal as float) as castFloat" - + ",cast(castDecimal as double) as castDouble" - + ",cast(castDecimal as char) as castChar" - + ",cast(castDecimal as varchar) as castVarchar" - + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal" - + ", castTimestamp", - "col1 = '9'") - .addTransform( - CAST_TABLEID.identifier(), - "col1" - + ",castInt" - + ",castBoolean" - + ",castTinyint" - + ",castSmallint" - + ",castBigint" - + ",castFloat" - + ",castDouble" - + ",castChar" - + ",cast(castTimestamp as varchar) as castVarchar" - + ",castDecimal" - + ",cast('1970-01-01T00:00:01.234' as TIMESTAMP(3)) as castTimestamp", - "col1 = '10'") + + ",CASE WHEN col1 = '1' THEN cast(col1 as int) WHEN col1 = '2' THEN cast(castInt as int) WHEN col1 = '3' THEN cast(castBoolean as int) WHEN col1 = '4' THEN cast(castTinyint as int) WHEN col1 = '5' THEN cast(castSmallint as int) WHEN col1 = '6' THEN cast(castBigint as int) WHEN col1 = '7' THEN castInt WHEN col1 = '8' THEN castInt WHEN col1 = '9' THEN castInt WHEN col1 = '10' THEN castInt END as castInt" + + ",CASE WHEN col1 = '1' THEN cast(col1 as boolean) WHEN col1 = '2' THEN cast(castInt as boolean) WHEN col1 = '3' THEN cast(castBoolean as boolean) WHEN col1 = '4' THEN cast(castTinyint as boolean) WHEN col1 = '5' THEN cast(castSmallint as boolean) WHEN col1 = '6' THEN cast(castBigint as boolean) WHEN col1 = '7' THEN cast(castFloat as boolean) WHEN col1 = '8' THEN cast(castDouble as boolean) WHEN col1 = '9' THEN cast(castDecimal as boolean) WHEN col1 = '10' THEN castBoolean END as castBoolean" + + ",CASE WHEN col1 = '1' THEN cast(col1 as tinyint) WHEN col1 = '2' THEN cast(castInt as tinyint) WHEN col1 = '3' THEN cast(castBoolean as tinyint) WHEN col1 = '4' THEN cast(castTinyint as tinyint) WHEN col1 = '5' THEN cast(castSmallint as tinyint) WHEN col1 = '6' THEN cast(castBigint as tinyint) WHEN col1 = '7' THEN castTinyint WHEN col1 = '8' THEN castTinyint WHEN col1 = '9' THEN castTinyint WHEN col1 = '10' THEN castTinyint END as castTinyint" + + ",CASE WHEN col1 = '1' THEN cast(col1 as smallint) WHEN col1 = '2' THEN cast(castInt as smallint) WHEN col1 = '3' THEN cast(castBoolean as smallint) WHEN col1 = '4' THEN cast(castTinyint as smallint) WHEN col1 = '5' THEN cast(castSmallint as smallint) WHEN col1 = '6' THEN cast(castBigint as smallint) WHEN col1 = '7' THEN castSmallint WHEN col1 = '8' THEN castSmallint WHEN col1 = '9' THEN castSmallint WHEN col1 = '10' THEN castSmallint END as castSmallint" + + ",CASE WHEN col1 = '1' THEN cast(col1 as bigint) WHEN col1 = '2' THEN cast(castInt as bigint) WHEN col1 = '3' THEN cast(castBoolean as bigint) WHEN col1 = '4' THEN cast(castTinyint as bigint) WHEN col1 = '5' THEN cast(castSmallint as bigint) WHEN col1 = '6' THEN cast(castBigint as bigint) WHEN col1 = '7' THEN castBigint WHEN col1 = '8' THEN castBigint WHEN col1 = '9' THEN castBigint WHEN col1 = '10' THEN castBigint END as castBigint" + + ",CASE WHEN col1 = '1' THEN cast(col1 as float) WHEN col1 = '2' THEN cast(castInt as float) WHEN col1 = '3' THEN castFloat WHEN col1 = '4' THEN cast(castTinyint as float) WHEN col1 = '5' THEN cast(castSmallint as float) WHEN col1 = '6' THEN cast(castBigint as float) WHEN col1 = '7' THEN cast(castFloat as float) WHEN col1 = '8' THEN cast(castDouble as float) WHEN col1 = '9' THEN cast(castDecimal as float) WHEN col1 = '10' THEN castFloat END as castFloat" + + ",CASE WHEN col1 = '1' THEN cast(col1 as double) WHEN col1 = '2' THEN cast(castInt as double) WHEN col1 = '3' THEN castDouble WHEN col1 = '4' THEN cast(castTinyint as double) WHEN col1 = '5' THEN cast(castSmallint as double) WHEN col1 = '6' THEN cast(castBigint as double) WHEN col1 = '7' THEN cast(castFloat as double) WHEN col1 = '8' THEN cast(castDouble as double) WHEN col1 = '9' THEN cast(castDecimal as double) WHEN col1 = '10' THEN castDouble END as castDouble" + + ",CASE WHEN col1 = '1' THEN cast(col1 as char) WHEN col1 = '2' THEN cast(castInt as char) WHEN col1 = '3' THEN cast(castBoolean as char) WHEN col1 = '4' THEN cast(castTinyint as char) WHEN col1 = '5' THEN cast(castSmallint as char) WHEN col1 = '6' THEN cast(castBigint as char) WHEN col1 = '7' THEN cast(castFloat as char) WHEN col1 = '8' THEN cast(castDouble as char) WHEN col1 = '9' THEN cast(castDecimal as char) WHEN col1 = '10' THEN castChar END as castChar" + + ",CASE WHEN col1 = '1' THEN cast(col1 as varchar) WHEN col1 = '2' THEN cast(castInt as varchar) WHEN col1 = '3' THEN cast(castBoolean as varchar) WHEN col1 = '4' THEN cast(castTinyint as varchar) WHEN col1 = '5' THEN cast(castSmallint as varchar) WHEN col1 = '6' THEN cast(castBigint as varchar) WHEN col1 = '7' THEN cast(castFloat as varchar) WHEN col1 = '8' THEN cast(castDouble as varchar) WHEN col1 = '9' THEN cast(castDecimal as varchar) WHEN col1 = '10' THEN cast(castTimestamp as varchar) END as castVarchar" + + ",CASE WHEN col1 = '1' THEN cast(col1 as DECIMAL(4,2)) WHEN col1 = '2' THEN cast(castInt as DECIMAL(4,2)) WHEN col1 = '3' THEN castDecimal WHEN col1 = '4' THEN cast(castTinyint as DECIMAL(4,2)) WHEN col1 = '5' THEN cast(castSmallint as DECIMAL(4,2)) WHEN col1 = '6' THEN cast(castBigint as DECIMAL(4,2)) WHEN col1 = '7' THEN cast(castFloat as DECIMAL(4,2)) WHEN col1 = '8' THEN cast(castDouble as DECIMAL(4,2)) WHEN col1 = '9' THEN cast(castDecimal as DECIMAL(4,2)) WHEN col1 = '10' THEN castDecimal END as castDecimal" + + ",CASE WHEN col1 = '1' THEN castTimestamp WHEN col1 = '2' THEN castTimestamp WHEN col1 = '3' THEN castTimestamp WHEN col1 = '4' THEN castTimestamp WHEN col1 = '5' THEN castTimestamp WHEN col1 = '6' THEN castTimestamp WHEN col1 = '7' THEN castTimestamp WHEN col1 = '8' THEN castTimestamp WHEN col1 = '9' THEN castTimestamp WHEN col1 = '10' THEN cast('1970-01-01T00:00:01.234' as TIMESTAMP(3)) END as castTimestamp", + null) .addTimezone("UTC") .build(); RegularEventOperatorTestHarness diff --git a/pom.xml b/pom.xml index 53220c9efb2..fb431f0afb6 100644 --- a/pom.xml +++ b/pom.xml @@ -366,7 +366,6 @@ limitations under the License. commons-codec commons-codec 1.15 - test