diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index afb01cc8eef..de03dbc53d2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -335,7 +335,7 @@ private String convertInvalidTimestampDefaultValue(String defaultValue, DataType || dataType instanceof TimestampType || dataType instanceof ZonedTimestampType) { - if (DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + if (defaultValue.startsWith(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) { return DorisSchemaUtils.DEFAULT_DATETIME; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 43384f7e7a3..b8c48d7a165 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -630,6 +630,108 @@ void testMysqlDefaultTimestampValueConversionInAddColumn(boolean batchMode) thro assertEqualsInOrder(expected, actual); } + /** Microsecond variant: '0000-00-00 00:00:00.000000'. */ + private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000"; + + @ParameterizedTest(name = "batchMode: {0}") + @ValueSource(booleans = {true, false}) + void testMysqlDefaultTimestampValueWithMicrosInCreateTable(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .column( + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(6), + null, + INVALID_DATETIME_WITH_MICROS)) + .column( + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(6), + null, + INVALID_DATETIME_WITH_MICROS)) + .primaryKey("id") + .build(); + + runJobWithEvents( + Collections.singletonList(new CreateTableEvent(tableId, schema)), batchMode); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(150) | Yes | false | null", + "created_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME, + "updated_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + + @ParameterizedTest(name = "batchMode: {0}") + @ValueSource(booleans = {true, false}) + void testMysqlDefaultTimestampValueWithMicrosInAddColumn(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema initialSchema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .primaryKey("id") + .build(); + + List events = new ArrayList<>(); + events.add(new CreateTableEvent(tableId, initialSchema)); + + PhysicalColumn createdTimeCol = + new PhysicalColumn( + "created_time", DataTypes.TIMESTAMP(6), null, INVALID_DATETIME_WITH_MICROS); + + PhysicalColumn updatedTimeCol = + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(6), + null, + INVALID_DATETIME_WITH_MICROS); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(createdTimeCol)))); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(updatedTimeCol)))); + + runJobWithEvents(events, batchMode); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(150) | Yes | false | null", + "created_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME, + "updated_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + private void runJobWithEvents(List events, boolean batchMode) throws Exception { DataStream stream = env.fromData(events, new EventTypeInfo()).setParallelism(1); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java index 22ee7ade10f..5e8beacc950 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -161,7 +161,7 @@ private static String convertInvalidTimestampDefaultValue( || dataType instanceof TimestampType || dataType instanceof ZonedTimestampType) { - if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + if (defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) { return DEFAULT_DATETIME; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 7f0ed436db4..9d7e1829a08 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -669,4 +669,58 @@ public void testMysqlDefaultTimestampValueConversionInAddColumn() Assertions.assertThat(table).isNotNull(); } + + /** Microsecond variant: '0000-00-00 00:00:00.000000'. */ + private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000"; + + @Test + public void testMysqlDefaultTimestampValueWithMicrosInAddColumn() + throws SchemaEvolveException, + Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException { + initialize("filesystem"); + Map tableOptions = new HashMap<>(); + tableOptions.put("bucket", "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.timestamp_micros_test"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "name", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "created_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6), + null, + INVALID_DATETIME_WITH_MICROS))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "updated_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6), + null, + INVALID_DATETIME_WITH_MICROS))); + + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.timestamp_micros_test"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString("test.timestamp_micros_test")); + + Assertions.assertThat(table).isNotNull(); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index f1e13bc090c..fd1d3c06845 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -582,4 +582,105 @@ void testMysqlDefaultTimestampValueConversionInAddColumn() throws Exception { assertEqualsInOrder(expected, actual); } + + /** Microsecond variant: '0000-00-00 00:00:00.000000'. */ + private static final String INVALID_DATETIME_WITH_MICROS = "0000-00-00 00:00:00.000000"; + + @Test + void testMysqlDefaultTimestampValueWithMicrosInCreateTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .column( + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(6), + null, + INVALID_DATETIME_WITH_MICROS)) + .column( + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(6), + null, + INVALID_DATETIME_WITH_MICROS)) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "name | varchar(150) | YES | false | null", + "created_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME, + "updated_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + + @Test + void testMysqlDefaultTimestampValueWithMicrosInAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema initialSchema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .primaryKey("id") + .build(); + + List events = new ArrayList<>(); + events.add(new CreateTableEvent(tableId, initialSchema)); + + PhysicalColumn createdTimeCol = + new PhysicalColumn( + "created_time", DataTypes.TIMESTAMP(6), null, INVALID_DATETIME_WITH_MICROS); + + PhysicalColumn updatedTimeCol = + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(6), + null, + INVALID_DATETIME_WITH_MICROS); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(createdTimeCol)))); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(updatedTimeCol)))); + + runJobWithEvents(events); + + List actual = inspectTableSchema(tableId); + + List expected = + Arrays.asList( + "id | int | NO | true | null", + "name | varchar(150) | YES | false | null", + "created_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME, + "updated_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } }