Skip to content

Commit e6413c3

Browse files
JeremyXinJeremyXin
and
JeremyXin
authored
[Fix][connector-file-base] fix parquet int32 convert error (#9142)
Co-authored-by: JeremyXin <[email protected]>
1 parent 4f5adeb commit e6413c3

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java

+2
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String name) {
297297
return BasicType.BYTE_TYPE;
298298
case INT_16:
299299
return BasicType.SHORT_TYPE;
300+
case INT_32:
301+
return BasicType.INT_TYPE;
300302
case DATE:
301303
return LocalTimeType.LOCAL_DATE_TYPE;
302304
default:

seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java

+82
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,17 @@
3636
import org.apache.hadoop.conf.Configuration;
3737
import org.apache.hadoop.fs.Path;
3838
import org.apache.parquet.avro.AvroParquetWriter;
39+
import org.apache.parquet.example.data.Group;
40+
import org.apache.parquet.example.data.simple.SimpleGroup;
41+
import org.apache.parquet.hadoop.ParquetFileReader;
3942
import org.apache.parquet.hadoop.ParquetWriter;
43+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
44+
import org.apache.parquet.hadoop.example.GroupWriteSupport;
4045
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
46+
import org.apache.parquet.hadoop.util.HadoopInputFile;
47+
import org.apache.parquet.schema.LogicalTypeAnnotation;
48+
import org.apache.parquet.schema.MessageType;
49+
import org.apache.parquet.schema.Types;
4150

4251
import org.junit.jupiter.api.Assertions;
4352
import org.junit.jupiter.api.Test;
@@ -56,6 +65,7 @@
5665
import java.util.TimeZone;
5766

5867
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
68+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
5969

6070
@Slf4j
6171
public class ParquetReadStrategyTest {
@@ -211,6 +221,34 @@ public void testParquetReadUnsupportedType() throws Exception {
211221
AutoGenerateParquetData.deleteFile();
212222
}
213223

224+
@DisabledOnOs(OS.WINDOWS)
225+
@Test
226+
public void testParquetTypeInt32WithLogicalTypeAnnotation() throws IOException {
227+
228+
NativeParquetWriter.generateTestData();
229+
230+
try (ParquetFileReader reader =
231+
ParquetFileReader.open(
232+
HadoopInputFile.fromPath(
233+
new Path(NativeParquetWriter.DATA_FILE_PATH),
234+
new Configuration()))) {
235+
236+
MessageType schema = reader.getFileMetaData().getSchema();
237+
LogicalTypeAnnotation type = schema.getType("id").getLogicalTypeAnnotation();
238+
Assertions.assertTrue(type instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation);
239+
}
240+
241+
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
242+
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
243+
parquetReadStrategy.init(localConf);
244+
SeaTunnelRowType seaTunnelRowTypeInfo =
245+
parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH);
246+
Assertions.assertNotNull(seaTunnelRowTypeInfo);
247+
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(0).getTypeClass(), Integer.class);
248+
TestCollector testCollector = new TestCollector();
249+
parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "", testCollector);
250+
}
251+
214252
public static class TestCollector implements Collector<SeaTunnelRow> {
215253

216254
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -344,4 +382,48 @@ public static void deleteFile() {
344382
}
345383
}
346384
}
385+
386+
/** Write data based on the Parquet native api */
387+
public static class NativeParquetWriter {
388+
389+
public static final String DATA_FILE_PATH = "/tmp/data_native.parquet";
390+
391+
// 1. Define Parquet Native Schema (MessageType)
392+
public static MessageType createSchema() {
393+
return Types.buildMessage()
394+
.required(INT32)
395+
.as(LogicalTypeAnnotation.intType(32, true))
396+
.named("id")
397+
.named("User");
398+
}
399+
400+
// 2. write data
401+
public static void generateTestData() throws IOException {
402+
deleteFile();
403+
MessageType schema = createSchema();
404+
Configuration conf = new Configuration();
405+
406+
GroupWriteSupport.setSchema(schema, conf);
407+
408+
Path file = new Path(DATA_FILE_PATH);
409+
try (ParquetWriter<Group> writer =
410+
ExampleParquetWriter.builder(file)
411+
.withConf(conf)
412+
.withCompressionCodec(CompressionCodecName.SNAPPY)
413+
.build()) {
414+
415+
Group record1 = new SimpleGroup(schema);
416+
record1.add("id", 1);
417+
418+
writer.write(record1);
419+
}
420+
}
421+
422+
private static void deleteFile() {
423+
File parquetFile = new File(DATA_FILE_PATH);
424+
if (parquetFile.exists()) {
425+
parquetFile.delete();
426+
}
427+
}
428+
}
347429
}

0 commit comments

Comments
 (0)