diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java index 3b922983b7e..dae8221f531 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java @@ -41,6 +41,9 @@ public class KingbaseTypeConverter extends PostgresTypeConverter { public static final String KB_BLOB = "BLOB"; public static final String KB_CLOB = "CLOB"; public static final String KB_BIT = "BIT"; + public static final String KB_INT = "INT"; + public static final String KB_SMALLINT = "SMALLINT"; + public static final String KB_BIGINT = "BIGINT"; public static final KingbaseTypeConverter INSTANCE = new KingbaseTypeConverter(); @@ -88,6 +91,15 @@ public Column convert(BasicTypeDefine typeDefine) { byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0; builder.columnLength(byteLength); break; + case KB_SMALLINT: + builder.dataType(BasicType.SHORT_TYPE); + break; + case KB_INT: + builder.dataType(BasicType.INT_TYPE); + break; + case KB_BIGINT: + builder.dataType(BasicType.LONG_TYPE); + break; default: throw CommonError.convertToSeaTunnelTypeError( DatabaseIdentifier.KINGBASE, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java index 11c99b41a8c..f2828c3f671 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java @@ -862,4 +862,48 @@ public void testReconvertArray() { KingbaseTypeConverter.PG_SMALLINT_ARRAY, typeDefine.getColumnType()); Assertions.assertEquals(KingbaseTypeConverter.PG_SMALLINT_ARRAY, typeDefine.getDataType()); } + + @Test + public void testConvertKingBaseInt() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType(KingbaseTypeConverter.KB_INT) + .dataType(KingbaseTypeConverter.KB_INT) + .build(); + Column column = KingbaseTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType()); + Assertions.assertEquals(KingbaseTypeConverter.KB_INT, column.getSourceType().toUpperCase()); + } + + @Test + public void testConvertKingBaseSmallint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType(KingbaseTypeConverter.KB_SMALLINT) + .dataType(KingbaseTypeConverter.KB_SMALLINT) + .build(); + Column column = KingbaseTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType()); + Assertions.assertEquals( + KingbaseTypeConverter.KB_SMALLINT, column.getSourceType().toUpperCase()); + } + + @Test + public void testConvertKingBaseBigint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType(KingbaseTypeConverter.KB_BIGINT) + .dataType(KingbaseTypeConverter.KB_BIGINT) + .build(); + Column column = KingbaseTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType()); + Assertions.assertEquals( + KingbaseTypeConverter.KB_BIGINT, column.getSourceType().toUpperCase()); + } }