Skip to content

Commit 02cd013

Browse files
committed
[FLINK-39232] Loosen transformed schema merging validation check
1 parent 323b2bc commit 02cd013

10 files changed

Lines changed: 141 additions & 1059 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java

Lines changed: 0 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.apache.flink.cdc.common.types.ZonedTimestampType;
6464
import org.apache.flink.cdc.common.types.variant.Variant;
6565

66-
import org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
6766
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
6867
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
6968
import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding;
@@ -320,139 +319,6 @@ public static Object[] coerceRow(
320319
return coercedRow;
321320
}
322321

323-
/**
324-
* Try to merge given {@link Schema}s and ensure they're identical. The only difference allowed
325-
* is nullability, string and varchar precision, default value, and comments.
326-
*/
327-
public static Schema strictlyMergeSchemas(List<Schema> schemas) {
328-
Preconditions.checkArgument(
329-
!schemas.isEmpty(), "Trying to merge transformed schemas %s, but got empty list");
330-
if (schemas.size() == 1) {
331-
return schemas.get(0);
332-
}
333-
334-
List<List<String>> primaryKeys =
335-
schemas.stream()
336-
.map(Schema::primaryKeys)
337-
.filter(p -> !p.isEmpty())
338-
.distinct()
339-
.collect(Collectors.toList());
340-
List<List<String>> partitionKeys =
341-
schemas.stream()
342-
.map(Schema::partitionKeys)
343-
.filter(p -> !p.isEmpty())
344-
.distinct()
345-
.collect(Collectors.toList());
346-
List<Map<String, String>> options =
347-
schemas.stream()
348-
.map(Schema::options)
349-
.filter(p -> !p.isEmpty())
350-
.distinct()
351-
.collect(Collectors.toList());
352-
List<List<String>> columnNames =
353-
schemas.stream()
354-
.map(Schema::getColumnNames)
355-
.distinct()
356-
.collect(Collectors.toList());
357-
358-
Preconditions.checkArgument(
359-
primaryKeys.size() <= 1,
360-
"Trying to merge transformed schemas %s, but got more than one primary key configurations: %s",
361-
schemas,
362-
primaryKeys);
363-
Preconditions.checkArgument(
364-
partitionKeys.size() <= 1,
365-
"Trying to merge transformed schemas %s, but got more than one partition key configurations: %s",
366-
schemas,
367-
partitionKeys);
368-
Preconditions.checkArgument(
369-
options.size() <= 1,
370-
"Trying to merge transformed schemas %s, but got more than one option configurations: %s",
371-
schemas,
372-
options);
373-
Preconditions.checkArgument(
374-
columnNames.size() == 1,
375-
"Trying to merge transformed schemas %s, but got more than one column name views: %s",
376-
schemas,
377-
columnNames);
378-
379-
int arity = columnNames.get(0).size();
380-
381-
ArrayListMultimap<Integer, DataType> toBeMergedColumnTypes =
382-
ArrayListMultimap.create(arity, 1);
383-
for (Schema schema : schemas) {
384-
List<DataType> columnTypes = schema.getColumnDataTypes();
385-
for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) {
386-
toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex));
387-
}
388-
}
389-
390-
List<String> mergedColumnNames = columnNames.iterator().next();
391-
List<DataType> mergedColumnTypes = new ArrayList<>(arity);
392-
for (int i = 0; i < arity; i++) {
393-
mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i)));
394-
}
395-
396-
List<Column> mergedColumns = new ArrayList<>();
397-
for (int i = 0; i < mergedColumnNames.size(); i++) {
398-
mergedColumns.add(
399-
Column.physicalColumn(mergedColumnNames.get(i), mergedColumnTypes.get(i)));
400-
}
401-
402-
return Schema.newBuilder()
403-
.primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : primaryKeys.get(0))
404-
.partitionKey(
405-
partitionKeys.isEmpty() ? Collections.emptyList() : partitionKeys.get(0))
406-
.options(options.isEmpty() ? Collections.emptyMap() : options.get(0))
407-
.setColumns(mergedColumns)
408-
.build();
409-
}
410-
411-
private static DataType strictlyMergeDataTypes(List<DataType> dataTypes) {
412-
Preconditions.checkArgument(
413-
!dataTypes.isEmpty(),
414-
"Trying to merge transformed data types %s, but got empty list");
415-
416-
List<DataType> simpleMergeTypes =
417-
dataTypes.stream().distinct().collect(Collectors.toList());
418-
if (simpleMergeTypes.size() == 1) {
419-
return simpleMergeTypes.get(0);
420-
}
421-
422-
List<DataTypeRoot> typeRoots =
423-
dataTypes.stream()
424-
.map(DataType::getTypeRoot)
425-
.distinct()
426-
.collect(Collectors.toList());
427-
Preconditions.checkArgument(
428-
typeRoots.size() == 1,
429-
"Trying to merge types %s, but got more than one type root: %s",
430-
dataTypes,
431-
typeRoots);
432-
433-
// Decay types to the most
434-
DataType type = dataTypes.get(0);
435-
436-
if (type.is(DataTypeRoot.CHAR)) {
437-
return DataTypes.CHAR(CharType.MAX_LENGTH);
438-
} else if (type.is(DataTypeRoot.VARCHAR)) {
439-
return DataTypes.STRING();
440-
} else if (type.is(DataTypeRoot.BINARY)) {
441-
return DataTypes.BINARY(BinaryType.MAX_LENGTH);
442-
} else if (type.is(DataTypeRoot.VARBINARY)) {
443-
return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH);
444-
} else if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
445-
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
446-
} else if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
447-
return DataTypes.TIMESTAMP_TZ(ZonedTimestampType.MAX_PRECISION);
448-
} else if (type.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
449-
return DataTypes.TIMESTAMP_LTZ(LocalZonedTimestampType.MAX_PRECISION);
450-
} else {
451-
throw new IllegalArgumentException(
452-
"Unable to merge data types with different precision: " + dataTypes);
453-
}
454-
}
455-
456322
@VisibleForTesting
457323
static boolean isDataTypeCompatible(@Nullable DataType currentType, DataType upcomingType) {
458324
// If two types are identical, they're compatible of course.

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -613,73 +613,6 @@ void testOpTypeMetadataColumnInBatchMode(ValuesDataSink.SinkApi sinkApi) throws
613613
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}");
614614
}
615615

616-
@ParameterizedTest
617-
@EnumSource
618-
void testTransformTwiceInBatchMode(ValuesDataSink.SinkApi sinkApi) throws Exception {
619-
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
620-
621-
// Setup value source
622-
Configuration sourceConfig = new Configuration();
623-
sourceConfig.set(
624-
ValuesDataSourceOptions.EVENT_SET_ID,
625-
ValuesDataSourceHelper.EventSetId.TRANSFORM_BATCH_TABLE);
626-
SourceDef sourceDef =
627-
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
628-
629-
// Setup value sink
630-
Configuration sinkConfig = new Configuration();
631-
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
632-
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
633-
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
634-
635-
// Setup transform
636-
TransformDef transformDef1 =
637-
new TransformDef(
638-
"default_namespace.default_schema.table1",
639-
"*,concat(col1,'1') as col12",
640-
"col1 = '1' OR col1 = '999'",
641-
"col1",
642-
"col12",
643-
"key1=value1",
644-
"",
645-
null);
646-
TransformDef transformDef2 =
647-
new TransformDef(
648-
"default_namespace.default_schema.table1",
649-
"*,concat(col1,'2') as col12",
650-
"col1 = '2'",
651-
null,
652-
null,
653-
null,
654-
"",
655-
null);
656-
// Setup pipeline
657-
Configuration pipelineConfig = new Configuration();
658-
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
659-
pipelineConfig.set(
660-
PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE, RuntimeExecutionMode.BATCH);
661-
PipelineDef pipelineDef =
662-
new PipelineDef(
663-
sourceDef,
664-
sinkDef,
665-
Collections.emptyList(),
666-
new ArrayList<>(Arrays.asList(transformDef1, transformDef2)),
667-
Collections.emptyList(),
668-
pipelineConfig);
669-
670-
// Execute the pipeline
671-
PipelineExecution execution = composer.compose(pipelineDef);
672-
execution.execute();
673-
674-
// Check the order and content of all received events
675-
String[] outputEvents = outCaptor.toString().trim().split("\n");
676-
assertThat(outputEvents)
677-
.containsExactly(
678-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
679-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
680-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}");
681-
}
682-
683616
@ParameterizedTest
684617
@EnumSource
685618
void testOneToOneRoutingInBatchMode(ValuesDataSink.SinkApi sinkApi) throws Exception {

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -423,78 +423,6 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
423423
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}");
424424
}
425425

426-
@ParameterizedTest
427-
@EnumSource
428-
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
429-
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
430-
431-
// Setup value source
432-
Configuration sourceConfig = new Configuration();
433-
sourceConfig.set(
434-
ValuesDataSourceOptions.EVENT_SET_ID,
435-
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
436-
SourceDef sourceDef =
437-
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
438-
439-
// Setup value sink
440-
Configuration sinkConfig = new Configuration();
441-
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
442-
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
443-
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
444-
445-
// Setup transform
446-
TransformDef transformDef1 =
447-
new TransformDef(
448-
"default_namespace.default_schema.table1",
449-
"*,concat(col1,'1') as col12",
450-
"col1 = '1' OR col1 = '999'",
451-
"col1",
452-
"col12",
453-
"key1=value1",
454-
"",
455-
null);
456-
TransformDef transformDef2 =
457-
new TransformDef(
458-
"default_namespace.default_schema.table1",
459-
"*,concat(col1,'2') as col12",
460-
"col1 = '2'",
461-
null,
462-
null,
463-
null,
464-
"",
465-
null);
466-
// Setup pipeline
467-
Configuration pipelineConfig = new Configuration();
468-
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
469-
pipelineConfig.set(
470-
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
471-
PipelineDef pipelineDef =
472-
new PipelineDef(
473-
sourceDef,
474-
sinkDef,
475-
Collections.emptyList(),
476-
new ArrayList<>(Arrays.asList(transformDef1, transformDef2)),
477-
Collections.emptyList(),
478-
pipelineConfig);
479-
480-
// Execute the pipeline
481-
PipelineExecution execution = composer.compose(pipelineDef);
482-
execution.execute();
483-
484-
// Check the order and content of all received events
485-
String[] outputEvents = outCaptor.toString().trim().split("\n");
486-
assertThat(outputEvents)
487-
.containsExactly(
488-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
489-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
490-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
491-
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
492-
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
493-
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
494-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=({op_ts=4})}",
495-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
496-
}
497-
498426
@ParameterizedTest
499427
@EnumSource
500428
void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -465,76 +465,6 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
465465
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, meta=({op_ts=5})}");
466466
}
467467

468-
@ParameterizedTest
469-
@EnumSource
470-
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
471-
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
472-
473-
// Setup value source
474-
Configuration sourceConfig = new Configuration();
475-
sourceConfig.set(
476-
ValuesDataSourceOptions.EVENT_SET_ID,
477-
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
478-
SourceDef sourceDef =
479-
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
480-
481-
// Setup value sink
482-
Configuration sinkConfig = new Configuration();
483-
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
484-
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
485-
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
486-
487-
// Setup transform
488-
TransformDef transformDef1 =
489-
new TransformDef(
490-
"default_namespace.default_schema.table1",
491-
"*,concat(col1,'1') as col12",
492-
"col1 = '1' OR col1 = '999'",
493-
"col1",
494-
"col12",
495-
"key1=value1",
496-
"",
497-
null);
498-
TransformDef transformDef2 =
499-
new TransformDef(
500-
"default_namespace.default_schema.table1",
501-
"*,concat(col1,'2') as col12",
502-
"col1 = '2'",
503-
null,
504-
null,
505-
null,
506-
"",
507-
null);
508-
// Setup pipeline
509-
Configuration pipelineConfig = new Configuration();
510-
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
511-
512-
PipelineDef pipelineDef =
513-
new PipelineDef(
514-
sourceDef,
515-
sinkDef,
516-
Collections.emptyList(),
517-
new ArrayList<>(Arrays.asList(transformDef1, transformDef2)),
518-
Collections.emptyList(),
519-
pipelineConfig);
520-
521-
// Execute the pipeline
522-
PipelineExecution execution = composer.compose(pipelineDef);
523-
execution.execute();
524-
525-
// Check the order and content of all received events
526-
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
527-
assertThat(outputEvents)
528-
.containsExactly(
529-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
530-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
531-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
532-
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
533-
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}",
534-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 11, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}",
535-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, meta=({op_ts=5})}");
536-
}
537-
538468
@Test
539469
void testOneToOneRouting() throws Exception {
540470
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

0 commit comments

Comments
 (0)