Skip to content

Commit 25a5817

Browse files
authored
[FLINK-38248] The microseconds default value of '0000-00-00 00:00:00.000000' for MySQL TIMESTAMP fields is not supported in downstream systems. (#4251)
1 parent 68ee0c9 commit 25a5817

5 files changed

Lines changed: 259 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ private String convertInvalidTimestampDefaultValue(String defaultValue, DataType
335335
|| dataType instanceof TimestampType
336336
|| dataType instanceof ZonedTimestampType) {
337337

338-
if (DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
338+
if (defaultValue.startsWith(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) {
339339
return DorisSchemaUtils.DEFAULT_DATETIME;
340340
}
341341
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,108 @@ void testMysqlDefaultTimestampValueConversionInAddColumn(boolean batchMode) thro
630630
assertEqualsInOrder(expected, actual);
631631
}
632632

633+
/** Microsecond variant: '0000-00-00 00:00:00.000000'. */
634+
private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000";
635+
636+
@ParameterizedTest(name = "batchMode: {0}")
637+
@ValueSource(booleans = {true, false})
638+
void testMysqlDefaultTimestampValueWithMicrosInCreateTable(boolean batchMode) throws Exception {
639+
TableId tableId =
640+
TableId.tableId(
641+
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
642+
643+
Schema schema =
644+
Schema.newBuilder()
645+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
646+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
647+
.column(
648+
new PhysicalColumn(
649+
"created_time",
650+
DataTypes.TIMESTAMP(6),
651+
null,
652+
INVALID_DATETIME_WITH_MICROS))
653+
.column(
654+
new PhysicalColumn(
655+
"updated_time",
656+
DataTypes.TIMESTAMP_LTZ(6),
657+
null,
658+
INVALID_DATETIME_WITH_MICROS))
659+
.primaryKey("id")
660+
.build();
661+
662+
runJobWithEvents(
663+
Collections.singletonList(new CreateTableEvent(tableId, schema)), batchMode);
664+
665+
List<String> actual = inspectTableSchema(tableId);
666+
667+
List<String> expected =
668+
Arrays.asList(
669+
"id | INT | Yes | true | null",
670+
"name | VARCHAR(150) | Yes | false | null",
671+
"created_time | DATETIME(6) | Yes | false | "
672+
+ DorisSchemaUtils.DEFAULT_DATETIME,
673+
"updated_time | DATETIME(6) | Yes | false | "
674+
+ DorisSchemaUtils.DEFAULT_DATETIME);
675+
676+
assertEqualsInOrder(expected, actual);
677+
}
678+
679+
@ParameterizedTest(name = "batchMode: {0}")
680+
@ValueSource(booleans = {true, false})
681+
void testMysqlDefaultTimestampValueWithMicrosInAddColumn(boolean batchMode) throws Exception {
682+
TableId tableId =
683+
TableId.tableId(
684+
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);
685+
686+
Schema initialSchema =
687+
Schema.newBuilder()
688+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
689+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
690+
.primaryKey("id")
691+
.build();
692+
693+
List<Event> events = new ArrayList<>();
694+
events.add(new CreateTableEvent(tableId, initialSchema));
695+
696+
PhysicalColumn createdTimeCol =
697+
new PhysicalColumn(
698+
"created_time", DataTypes.TIMESTAMP(6), null, INVALID_DATETIME_WITH_MICROS);
699+
700+
PhysicalColumn updatedTimeCol =
701+
new PhysicalColumn(
702+
"updated_time",
703+
DataTypes.TIMESTAMP_LTZ(6),
704+
null,
705+
INVALID_DATETIME_WITH_MICROS);
706+
707+
events.add(
708+
new AddColumnEvent(
709+
tableId,
710+
Collections.singletonList(
711+
new AddColumnEvent.ColumnWithPosition(createdTimeCol))));
712+
713+
events.add(
714+
new AddColumnEvent(
715+
tableId,
716+
Collections.singletonList(
717+
new AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
718+
719+
runJobWithEvents(events, batchMode);
720+
721+
List<String> actual = inspectTableSchema(tableId);
722+
723+
List<String> expected =
724+
Arrays.asList(
725+
"id | INT | Yes | true | null",
726+
"name | VARCHAR(150) | Yes | false | null",
727+
"created_time | DATETIME(6) | Yes | false | "
728+
+ DorisSchemaUtils.DEFAULT_DATETIME,
729+
"updated_time | DATETIME(6) | Yes | false | "
730+
+ DorisSchemaUtils.DEFAULT_DATETIME);
731+
732+
assertEqualsInOrder(expected, actual);
733+
}
734+
633735
private void runJobWithEvents(List<Event> events, boolean batchMode) throws Exception {
634736
DataStream<Event> stream = env.fromData(events, new EventTypeInfo()).setParallelism(1);
635737

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private static String convertInvalidTimestampDefaultValue(
161161
|| dataType instanceof TimestampType
162162
|| dataType instanceof ZonedTimestampType) {
163163

164-
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
164+
if (defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
165165
return DEFAULT_DATETIME;
166166
}
167167
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,4 +669,58 @@ public void testMysqlDefaultTimestampValueConversionInAddColumn()
669669

670670
Assertions.assertThat(table).isNotNull();
671671
}
672+
673+
/** Microsecond variant: '0000-00-00 00:00:00.000000'. */
674+
private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000";
675+
676+
@Test
677+
public void testMysqlDefaultTimestampValueWithMicrosInAddColumn()
678+
throws SchemaEvolveException,
679+
Catalog.TableNotExistException,
680+
Catalog.DatabaseNotEmptyException,
681+
Catalog.DatabaseNotExistException {
682+
initialize("filesystem");
683+
Map<String, String> tableOptions = new HashMap<>();
684+
tableOptions.put("bucket", "-1");
685+
MetadataApplier metadataApplier =
686+
new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
687+
688+
CreateTableEvent createTableEvent =
689+
new CreateTableEvent(
690+
TableId.parse("test.timestamp_micros_test"),
691+
org.apache.flink.cdc.common.schema.Schema.newBuilder()
692+
.physicalColumn(
693+
"id",
694+
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
695+
.physicalColumn(
696+
"name",
697+
org.apache.flink.cdc.common.types.DataTypes.STRING())
698+
.primaryKey("id")
699+
.build());
700+
metadataApplier.applySchemaChange(createTableEvent);
701+
702+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
703+
addedColumns.add(
704+
AddColumnEvent.last(
705+
Column.physicalColumn(
706+
"created_time",
707+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6),
708+
null,
709+
INVALID_DATETIME_WITH_MICROS)));
710+
addedColumns.add(
711+
AddColumnEvent.last(
712+
Column.physicalColumn(
713+
"updated_time",
714+
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6),
715+
null,
716+
INVALID_DATETIME_WITH_MICROS)));
717+
718+
AddColumnEvent addColumnEvent =
719+
new AddColumnEvent(TableId.parse("test.timestamp_micros_test"), addedColumns);
720+
metadataApplier.applySchemaChange(addColumnEvent);
721+
722+
Table table = catalog.getTable(Identifier.fromString("test.timestamp_micros_test"));
723+
724+
Assertions.assertThat(table).isNotNull();
725+
}
672726
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,4 +582,105 @@ void testMysqlDefaultTimestampValueConversionInAddColumn() throws Exception {
582582

583583
assertEqualsInOrder(expected, actual);
584584
}
585+
586+
/** Microsecond variant: '0000-00-00 00:00:00.000000'. */
587+
private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000";
588+
589+
@Test
590+
void testMysqlDefaultTimestampValueWithMicrosInCreateTable() throws Exception {
591+
TableId tableId =
592+
TableId.tableId(
593+
StarRocksContainer.STARROCKS_DATABASE_NAME,
594+
StarRocksContainer.STARROCKS_TABLE_NAME);
595+
596+
Schema schema =
597+
Schema.newBuilder()
598+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
599+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
600+
.column(
601+
new PhysicalColumn(
602+
"created_time",
603+
DataTypes.TIMESTAMP(6),
604+
null,
605+
INVALID_DATETIME_WITH_MICROS))
606+
.column(
607+
new PhysicalColumn(
608+
"updated_time",
609+
DataTypes.TIMESTAMP_LTZ(6),
610+
null,
611+
INVALID_DATETIME_WITH_MICROS))
612+
.primaryKey("id")
613+
.build();
614+
615+
runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema)));
616+
617+
List<String> actual = inspectTableSchema(tableId);
618+
619+
List<String> expected =
620+
Arrays.asList(
621+
"id | int | NO | true | null",
622+
"name | varchar(150) | YES | false | null",
623+
"created_time | datetime | YES | false | "
624+
+ StarRocksUtils.DEFAULT_DATETIME,
625+
"updated_time | datetime | YES | false | "
626+
+ StarRocksUtils.DEFAULT_DATETIME);
627+
628+
assertEqualsInOrder(expected, actual);
629+
}
630+
631+
@Test
632+
void testMysqlDefaultTimestampValueWithMicrosInAddColumn() throws Exception {
633+
TableId tableId =
634+
TableId.tableId(
635+
StarRocksContainer.STARROCKS_DATABASE_NAME,
636+
StarRocksContainer.STARROCKS_TABLE_NAME);
637+
638+
Schema initialSchema =
639+
Schema.newBuilder()
640+
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
641+
.column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null))
642+
.primaryKey("id")
643+
.build();
644+
645+
List<Event> events = new ArrayList<>();
646+
events.add(new CreateTableEvent(tableId, initialSchema));
647+
648+
PhysicalColumn createdTimeCol =
649+
new PhysicalColumn(
650+
"created_time", DataTypes.TIMESTAMP(6), null, INVALID_DATETIME_WITH_MICROS);
651+
652+
PhysicalColumn updatedTimeCol =
653+
new PhysicalColumn(
654+
"updated_time",
655+
DataTypes.TIMESTAMP_LTZ(6),
656+
null,
657+
INVALID_DATETIME_WITH_MICROS);
658+
659+
events.add(
660+
new AddColumnEvent(
661+
tableId,
662+
Collections.singletonList(
663+
new AddColumnEvent.ColumnWithPosition(createdTimeCol))));
664+
665+
events.add(
666+
new AddColumnEvent(
667+
tableId,
668+
Collections.singletonList(
669+
new AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
670+
671+
runJobWithEvents(events);
672+
673+
List<String> actual = inspectTableSchema(tableId);
674+
675+
List<String> expected =
676+
Arrays.asList(
677+
"id | int | NO | true | null",
678+
"name | varchar(150) | YES | false | null",
679+
"created_time | datetime | YES | false | "
680+
+ StarRocksUtils.DEFAULT_DATETIME,
681+
"updated_time | datetime | YES | false | "
682+
+ StarRocksUtils.DEFAULT_DATETIME);
683+
684+
assertEqualsInOrder(expected, actual);
685+
}
585686
}

0 commit comments

Comments
 (0)