org.apache.flink
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index d302f297e2d..22237a5f33f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -33,6 +33,7 @@
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarCharType;
@@ -43,6 +44,8 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.List;
@@ -132,6 +135,35 @@ public static void toStarRocksDataType(
private static final DateTimeFormatter DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ /** Format TIME type data. */
+ private static final DateTimeFormatter TIME_FORMATTER =
+ new DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();
+
+ private static final DateTimeFormatter[] TIME_FORMATTERS = new DateTimeFormatter[10];
+
+ private static DateTimeFormatter timeFormatter(int precision) {
+ if (precision <= 0) {
+ return TIME_FORMATTER;
+ }
+ if (precision < TIME_FORMATTERS.length) {
+ DateTimeFormatter formatter = TIME_FORMATTERS[precision];
+ if (formatter == null) {
+ formatter =
+ new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(
+ ChronoField.NANO_OF_SECOND, precision, precision, true)
+ .toFormatter();
+ TIME_FORMATTERS[precision] = formatter;
+ }
+ return formatter;
+ }
+ return new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, precision, precision, true)
+ .toFormatter();
+ }
+
/**
* Creates an accessor for getting elements in an internal RecordData structure at the given
* position.
@@ -183,6 +215,13 @@ record ->
fieldGetter =
record -> record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
break;
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getTime(fieldPos)
+ .toLocalTime()
+ .format(timeFormatter(getPrecision(fieldType)));
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
fieldGetter =
record ->
@@ -374,6 +413,21 @@ public StarRocksColumn.Builder visit(DateType dateType) {
return builder;
}
+ @Override
+ public StarRocksColumn.Builder visit(TimeType timeType) {
+ // StarRocks does not support TIME type, so map it to VARCHAR.
+ // Format: HH:mm:ss for precision 0, HH:mm:ss. for precision > 0
+ // Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 + precision
+ // For precision 0: "HH:mm:ss" = 8 characters
+ // For precision > 0: "HH:mm:ss." + precision digits
+ builder.setDataType(VARCHAR);
+ builder.setNullable(timeType.isNullable());
+ int precision = timeType.getPrecision();
+ int length = precision > 0 ? 8 + 1 + precision : 8;
+ builder.setColumnSize(length);
+ return builder;
+ }
+
@Override
public StarRocksColumn.Builder visit(TimestampType timestampType) {
builder.setDataType(DATETIME);
@@ -404,7 +458,8 @@ public static String convertInvalidTimestampDefaultValue(
|| dataType instanceof org.apache.flink.cdc.common.types.TimestampType
|| dataType instanceof org.apache.flink.cdc.common.types.ZonedTimestampType) {
- if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+ if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)
+ || defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
return DEFAULT_DATETIME;
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
index 5d830b9352c..a9c6f240e7b 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -28,6 +28,7 @@
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -45,6 +46,7 @@
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -71,6 +73,7 @@
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
@@ -260,6 +263,227 @@ void testMixedSchemaAndDataChanges() throws Exception {
Objects.requireNonNull(serializer.serialize(insertEvent3)));
}
+ @Test
+ void testTimeTypeSerialization() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with millisecond precision
+ .primaryKey("id")
+ .build();
+
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test insert with TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.of(9, 30, 15)), // 09:30:15
+ TimeData.fromLocalTime(
+ LocalTime.of(17, 45, 30, 123000000)) // 17:45:30.123
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+ "{\"id\":1,\"start_time\":\"09:30:15\",\"end_time\":\"17:45:30.123\",\"__op\":0}",
+ result);
+ }
+
+ @Test
+ void testTimeTypeZeroSecondsFormat() throws Exception {
+ TableId tableId = TableId.parse("test.time_zero_seconds_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("zero_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {1, TimeData.fromLocalTime(LocalTime.of(16, 0, 0))}));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(tableId, "{\"id\":1,\"zero_time\":\"16:00:00\",\"__op\":0}", result);
+ }
+
+ @Test
+ void testTimeTypeWithSchemaEvolution() throws Exception {
+ TableId tableId = TableId.parse("test.time_evolution_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .primaryKey("id")
+ .build();
+
+ // Create initial table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, initialSchema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator initialGenerator =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Insert initial data
+ DataChangeEvent initialInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ initialGenerator.generate(
+ new Object[] {1, BinaryStringData.fromString("Initial Record")}));
+
+ StarRocksRowData initialResult = serializer.serialize(initialInsert);
+ Assertions.assertThat(initialResult).isNotNull();
+
+ verifySerializeResult(
+ tableId, "{\"id\":1,\"name\":\"Initial Record\",\"__op\":0}", initialResult);
+
+ // Simulate schema evolution: add TIME column
+ Schema evolvedSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .physicalColumn("created_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ // Create AddColumnEvent to simulate schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("created_time", new TimeType()),
+ AddColumnEvent.ColumnPosition.LAST,
+ null)));
+ serializer.serialize(addColumnEvent);
+
+ // Insert data with TIME column after schema evolution
+ BinaryRecordDataGenerator evolvedGenerator =
+ new BinaryRecordDataGenerator(
+ evolvedSchema.getColumnDataTypes().toArray(new DataType[0]));
+
+ DataChangeEvent evolvedInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ evolvedGenerator.generate(
+ new Object[] {
+ 2,
+ BinaryStringData.fromString("Evolved Record"),
+ TimeData.fromLocalTime(LocalTime.of(14, 30, 0)) // 14:30:00
+ }));
+
+ StarRocksRowData evolvedResult = serializer.serialize(evolvedInsert);
+ Assertions.assertThat(evolvedResult).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+ "{\"id\":2,\"name\":\"Evolved Record\",\"created_time\":\"14:30:00\",\"__op\":0}",
+ evolvedResult);
+ }
+
+ @Test
+ void testTimeTypeBoundaryValues() throws Exception {
+ TableId tableId = TableId.parse("test.time_boundary_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("min_time", new TimeType())
+ .physicalColumn("max_time", new TimeType())
+ .physicalColumn("midnight", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test boundary TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.MIN), // 00:00:00
+ TimeData.fromLocalTime(
+ LocalTime
+ .MAX), // 23:59:59.999 (truncated to millisecond
+ // precision)
+ TimeData.fromLocalTime(LocalTime.MIDNIGHT) // 00:00:00
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+ "{\"id\":1,\"min_time\":\"00:00:00\",\"max_time\":\"23:59:59\",\"midnight\":\"00:00:00\",\"__op\":0}",
+ result);
+ }
+
+ @Test
+ void testTimeTypeWithNullValues() throws Exception {
+ TableId tableId = TableId.parse("test.time_null_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("nullable_time", new TimeType())
+ .physicalColumn("not_null_time", new TimeType().notNull())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test TIME values with null
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ null, // Null value for nullable column
+ TimeData.fromLocalTime(
+ LocalTime.of(12, 0, 0)) // Not null column
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId, "{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
+ }
+
private void verifySerializeResult(
TableId expectTable, String expectRow, StarRocksRowData actualRowData)
throws Exception {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index e2dc50551da..6501ba03c26 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -213,10 +213,10 @@ void testStarRocksDataType() throws Exception {
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
- // StarRocks sink doesn't support TIME type yet.
- // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
- // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
- // Precision"))
+ .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
+ .column(
+ new PhysicalColumn(
+ "time_3", DataTypes.TIME(3), "Time With Precision"))
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
.column(
new PhysicalColumn(
@@ -256,6 +256,9 @@ void testStarRocksDataType() throws Exception {
"string | varchar(1048576) | YES | false | null",
"decimal | decimal(17,7) | YES | false | null",
"date | date | YES | false | null",
+ // TIME type mapped to VARCHAR since StarRocks doesn't support TIME type
+ "time | varchar(8) | YES | false | null",
+ "time_3 | varchar(12) | YES | false | null",
"timestamp | datetime | YES | false | null",
"timestamp_3 | datetime | YES | false | null",
"timestampltz | datetime | YES | false | null",
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
index 28d7e940442..2da34d6cf4f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
@@ -28,6 +28,7 @@
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -226,4 +227,150 @@ void testDropColumn() throws Exception {
.build();
Assertions.assertThat(actualTable).isEqualTo(expectTable);
}
+
+ @Test
+ void testCreateTableWithTimeType() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with millisecond precision
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("start_time")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("end_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+ .setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testAddTimeTypeColumn() throws Exception {
+ TableId tableId = TableId.parse("test.add_time_column");
+ Schema schema =
+ Schema.newBuilder().physicalColumn("id", new IntType()).primaryKey("id").build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ // Add TIME type column through schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("duration", new TimeType())),
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("precision_time", new TimeType(3)))));
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("duration")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("precision_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+ .setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testTimeTypeWithDifferentPrecisions() throws Exception {
+ TableId tableId = TableId.parse("test.time_precision_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("time_default", new TimeType()) // Default precision
+ .physicalColumn("time_0", new TimeType(0)) // Second precision
+ .physicalColumn("time_3", new TimeType(3)) // Millisecond precision
+ .physicalColumn("time_max", new TimeType(3)) // Example precision 3
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ // Verify all TIME columns are correctly mapped to StarRocks VARCHAR type
+ // since StarRocks doesn't support TIME type
+ List timeColumns = Arrays.asList("time_default", "time_0", "time_3", "time_max");
+ for (StarRocksColumn column : actualTable.getColumns()) {
+ if (timeColumns.contains(column.getColumnName())) {
+ Assertions.assertThat(column.getDataType().toLowerCase()).isEqualTo("varchar");
+ }
+ }
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
index fb8ba75bdc7..aca5a72ff5a 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/pom.xml
@@ -76,12 +76,24 @@ limitations under the License.
flink-test-utils
${flink.version}
test
+
+
+ org.testcontainers
+ testcontainers
+
+