diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java index 2bcf6337d..3e9c852d9 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java @@ -16,12 +16,19 @@ package com.bytedance.bitsail.conector.legacy.fake.source; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; import com.bytedance.bitsail.connector.legacy.fake.source.FakeSource; +import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo; +import com.bytedance.bitsail.flink.core.typeutils.ColumnFlinkTypeInfoUtil; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -61,4 +68,13 @@ public void testConstructRandomValueWithUniqueCheck() { Assert.assertEquals(5678, actualValue); Assert.assertTrue(existValues.contains("5678")); } + + @Test + public void testSupportByteType() { + List columnInfos = new ArrayList<>(); + ColumnInfo age = new ColumnInfo("age", "byte"); + columnInfos.add(age); + RowTypeInfo rowTypeInfo = ColumnFlinkTypeInfoUtil.getRowTypeInformation(new BitSailTypeInfoConverter(), columnInfos); + Assert.assertTrue(rowTypeInfo.getFieldTypes()[0] instanceof PrimitiveColumnTypeInfo); + } } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java index 33b584e19..ae29d1a15 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java @@ -86,6 +86,16 @@ public Object convert(Schema schema, Object object) { return object; } }; + } else if (BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass().equals(typeClass)) { + converter = + new RowToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return Integer.valueOf(Long.toString((Long) object)); + } + }; } else if (BasicTypeInfo.BIG_INT_TYPE_INFO.getTypeClass().equals(typeClass)) { converter = new RowToAvroConverter() { @@ -255,4 +265,3 @@ public interface RowToAvroConverter extends Serializable { Object convert(Schema schema, Object object); } } - diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java index fd1b4b474..9052bbfb9 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java @@ -105,6 +105,8 @@ private static DataType baseType2DataType(String type) { case "tinyint": case "int": return DataTypes.INT(); + case "byte": + return DataTypes.TINYINT(); case "long": case "bigint": return DataTypes.BIGINT(); diff --git a/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java b/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java index 8ecd63d96..cb8754fe8 100644 --- a/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java +++ b/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java @@ -50,6 +50,8 @@ public class TypeInfoColumnBridge { PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.LONG_TYPE_INFO, PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); + COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.BYTE_TYPE_INFO, + PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.BIG_INTEGER_TYPE_INFO, PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO);