diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java index 1264df9807a..37d03b8ab7f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java @@ -297,6 +297,8 @@ private SeaTunnelDataType parquetType2SeaTunnelType(Type type, String name) { return BasicType.BYTE_TYPE; case INT_16: return BasicType.SHORT_TYPE; + case INT_32: + return BasicType.INT_TYPE; case DATE: return LocalTimeType.LOCAL_DATE_TYPE; default: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java index 66af39f18d0..1050c4a2d97 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java @@ -36,8 +36,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -56,6 +65,7 @@ import java.util.TimeZone; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; @Slf4j public class ParquetReadStrategyTest { @@ -211,6 +221,34 @@ public void testParquetReadUnsupportedType() throws Exception { AutoGenerateParquetData.deleteFile(); } + @DisabledOnOs(OS.WINDOWS) + @Test + public void testParquetTypeInt32WithLogicalTypeAnnotation() throws IOException { + + NativeParquetWriter.generateTestData(); + + try (ParquetFileReader reader = + ParquetFileReader.open( + HadoopInputFile.fromPath( + new Path(NativeParquetWriter.DATA_FILE_PATH), + new Configuration()))) { + + MessageType schema = reader.getFileMetaData().getSchema(); + LogicalTypeAnnotation type = schema.getType("id").getLogicalTypeAnnotation(); + Assertions.assertTrue(type instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation); + } + + ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + parquetReadStrategy.init(localConf); + SeaTunnelRowType seaTunnelRowTypeInfo = + parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH); + Assertions.assertNotNull(seaTunnelRowTypeInfo); + Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(0).getTypeClass(), Integer.class); + TestCollector testCollector = new TestCollector(); + parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "", testCollector); + } + public static class TestCollector implements Collector { private final List rows = new ArrayList<>(); @@ -344,4 +382,48 @@ public static void deleteFile() { } } } + + /** Write data based on the Parquet native api */ + public static class NativeParquetWriter { + + public static final String DATA_FILE_PATH = "/tmp/data_native.parquet"; + + // 1. Define Parquet Native Schema (MessageType) + public static MessageType createSchema() { + return Types.buildMessage() + .required(INT32) + .as(LogicalTypeAnnotation.intType(32, true)) + .named("id") + .named("User"); + } + + // 2. write data + public static void generateTestData() throws IOException { + deleteFile(); + MessageType schema = createSchema(); + Configuration conf = new Configuration(); + + GroupWriteSupport.setSchema(schema, conf); + + Path file = new Path(DATA_FILE_PATH); + try (ParquetWriter writer = + ExampleParquetWriter.builder(file) + .withConf(conf) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .build()) { + + Group record1 = new SimpleGroup(schema); + record1.add("id", 1); + + writer.write(record1); + } + } + + private static void deleteFile() { + File parquetFile = new File(DATA_FILE_PATH); + if (parquetFile.exists()) { + parquetFile.delete(); + } + } + } }