Skip to content

Commit 6c65805

Browse files
authored
[Improve] Improve read with parquet type convert error (#6683)
1 parent 4f4fd7b commit 6c65805

File tree

3 files changed

+132
-27
lines changed

3 files changed

+132
-27
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java

+33-27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
1919

2020
import org.apache.seatunnel.api.source.Collector;
21+
import org.apache.seatunnel.api.table.catalog.TablePath;
2122
import org.apache.seatunnel.api.table.type.ArrayType;
2223
import org.apache.seatunnel.api.table.type.BasicType;
2324
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -28,6 +29,7 @@
2829
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2930
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3031
import org.apache.seatunnel.api.table.type.SqlType;
32+
import org.apache.seatunnel.common.exception.CommonError;
3133
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
3234
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
3335
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -67,6 +69,7 @@
6769
import java.util.List;
6870
import java.util.Map;
6971
import java.util.concurrent.TimeUnit;
72+
import java.util.stream.IntStream;
7073

7174
@Slf4j
7275
public class ParquetReadStrategy extends AbstractReadStrategy {
@@ -75,6 +78,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
7578
private static final long NANOS_PER_MILLISECOND = 1000000;
7679
private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
7780
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
81+
private static final String PARQUET = "Parquet";
7882

7983
private int[] indexes;
8084

@@ -234,6 +238,12 @@ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
234238

235239
@Override
236240
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
241+
return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
242+
}
243+
244+
@Override
245+
public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, String path)
246+
throws FileConnectorException {
237247
ParquetMetadata metadata;
238248
try (ParquetFileReader reader =
239249
hadoopFileSystemProxy.doWithHadoopAuth(
@@ -259,19 +269,22 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnecto
259269
String[] fields = new String[readColumns.size()];
260270
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];
261271
indexes = new int[readColumns.size()];
262-
for (int i = 0; i < readColumns.size(); i++) {
263-
fields[i] = readColumns.get(i);
264-
Type type = originalSchema.getType(fields[i]);
265-
int fieldIndex = originalSchema.getFieldIndex(fields[i]);
266-
indexes[i] = fieldIndex;
267-
types[i] = parquetType2SeaTunnelType(type);
268-
}
272+
buildColumnsWithErrorCheck(
273+
tablePath,
274+
IntStream.range(0, readColumns.size()).iterator(),
275+
i -> {
276+
fields[i] = readColumns.get(i);
277+
Type type = originalSchema.getType(fields[i]);
278+
int fieldIndex = originalSchema.getFieldIndex(fields[i]);
279+
indexes[i] = fieldIndex;
280+
types[i] = parquetType2SeaTunnelType(type, fields[i]);
281+
});
269282
seaTunnelRowType = new SeaTunnelRowType(fields, types);
270283
seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
271284
return getActualSeaTunnelRowTypeInfo();
272285
}
273286

274-
private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
287+
private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String name) {
275288
if (type.isPrimitive()) {
276289
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
277290
case INT32:
@@ -287,9 +300,8 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
287300
case DATE:
288301
return LocalTimeType.LOCAL_DATE_TYPE;
289302
default:
290-
String errorMsg = String.format("Not support this type [%s]", type);
291-
throw new FileConnectorException(
292-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
303+
throw CommonError.convertToSeaTunnelTypeError(
304+
PARQUET, type.toString(), name);
293305
}
294306
case INT64:
295307
if (type.asPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
@@ -324,9 +336,7 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
324336
int scale = Integer.parseInt(splits[1]);
325337
return new DecimalType(precision, scale);
326338
default:
327-
String errorMsg = String.format("Not support this type [%s]", type);
328-
throw new FileConnectorException(
329-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
339+
throw CommonError.convertToSeaTunnelTypeError("Parquet", type.toString(), name);
330340
}
331341
} else {
332342
LogicalTypeAnnotation logicalTypeAnnotation =
@@ -339,7 +349,7 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
339349
for (int i = 0; i < fields.size(); i++) {
340350
Type fieldType = fields.get(i);
341351
SeaTunnelDataType<?> seaTunnelDataType =
342-
parquetType2SeaTunnelType(fields.get(i));
352+
parquetType2SeaTunnelType(fields.get(i), name);
343353
fieldNames[i] = fieldType.getName();
344354
seaTunnelDataTypes[i] = seaTunnelDataType;
345355
}
@@ -349,9 +359,9 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
349359
case MAP:
350360
GroupType groupType = type.asGroupType().getType(0).asGroupType();
351361
SeaTunnelDataType<?> keyType =
352-
parquetType2SeaTunnelType(groupType.getType(0));
362+
parquetType2SeaTunnelType(groupType.getType(0), name);
353363
SeaTunnelDataType<?> valueType =
354-
parquetType2SeaTunnelType(groupType.getType(1));
364+
parquetType2SeaTunnelType(groupType.getType(1), name);
355365
return new MapType<>(keyType, valueType);
356366
case LIST:
357367
Type elementType;
@@ -360,7 +370,8 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
360370
} catch (Exception e) {
361371
elementType = type.asGroupType().getType(0);
362372
}
363-
SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(elementType);
373+
SeaTunnelDataType<?> fieldType =
374+
parquetType2SeaTunnelType(elementType, name);
364375
switch (fieldType.getSqlType()) {
365376
case STRING:
366377
return ArrayType.STRING_ARRAY_TYPE;
@@ -379,17 +390,12 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
379390
case DOUBLE:
380391
return ArrayType.DOUBLE_ARRAY_TYPE;
381392
default:
382-
String errorMsg =
383-
String.format(
384-
"SeaTunnel array type not supported this genericType [%s] yet",
385-
fieldType);
386-
throw new FileConnectorException(
387-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
393+
throw CommonError.convertToSeaTunnelTypeError(
394+
PARQUET, type.toString(), name);
388395
}
389396
default:
390-
throw new FileConnectorException(
391-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
392-
"SeaTunnel file connector not support this nest type");
397+
throw CommonError.convertToSeaTunnelTypeError(
398+
PARQUET, type.toString(), name);
393399
}
394400
}
395401
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java

+36
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,23 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.source.Collector;
23+
import org.apache.seatunnel.api.table.catalog.TablePath;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2425
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
26+
import org.apache.seatunnel.common.exception.CommonError;
27+
import org.apache.seatunnel.common.exception.CommonErrorCode;
28+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
2529
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2630
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
2731

2832
import java.io.Closeable;
2933
import java.io.IOException;
3034
import java.io.Serializable;
35+
import java.util.Iterator;
36+
import java.util.LinkedHashMap;
3137
import java.util.List;
38+
import java.util.Map;
39+
import java.util.function.Consumer;
3240

3341
public interface ReadStrategy extends Serializable, Closeable {
3442
void init(HadoopConf conf);
@@ -38,6 +46,11 @@ void read(String path, String tableId, Collector<SeaTunnelRow> output)
3846

3947
SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException;
4048

49+
default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, String path)
50+
throws FileConnectorException {
51+
return getSeaTunnelRowTypeInfo(path);
52+
}
53+
4154
default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
4255
String path, SeaTunnelRowType rowType) throws FileConnectorException {
4356
return getSeaTunnelRowTypeInfo(path);
@@ -53,4 +66,27 @@ default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
5366

5467
// todo: use CatalogTable
5568
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
69+
70+
default <T> void buildColumnsWithErrorCheck(
71+
TablePath tablePath, Iterator<T> keys, Consumer<T> getDataType) {
72+
Map<String, String> unsupported = new LinkedHashMap<>();
73+
while (keys.hasNext()) {
74+
try {
75+
getDataType.accept(keys.next());
76+
} catch (SeaTunnelRuntimeException e) {
77+
if (e.getSeaTunnelErrorCode()
78+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
79+
unsupported.put(e.getParams().get("field"), e.getParams().get("dataType"));
80+
} else {
81+
throw e;
82+
}
83+
}
84+
}
85+
if (!unsupported.isEmpty()) {
86+
throw CommonError.getCatalogTableWithUnsupportedType(
87+
this.getClass().getSimpleName().replace("ReadStrategy", ""),
88+
tablePath.getFullName(),
89+
unsupported);
90+
}
91+
}
5692
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java

+63
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.api.table.type.ArrayType;
2525
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2626
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
27+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
2728
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2829
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
2930

@@ -189,6 +190,27 @@ public void testParquetReadArray() throws Exception {
189190
AutoGenerateParquetData.deleteFile();
190191
}
191192

193+
@DisabledOnOs(OS.WINDOWS)
194+
@Test
195+
public void testParquetReadUnsupportedType() throws Exception {
196+
AutoGenerateParquetDataWithUnsupportedType.generateTestData();
197+
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
198+
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
199+
parquetReadStrategy.init(localConf);
200+
SeaTunnelRuntimeException exception =
201+
Assertions.assertThrows(
202+
SeaTunnelRuntimeException.class,
203+
() ->
204+
parquetReadStrategy.getSeaTunnelRowTypeInfo(
205+
AutoGenerateParquetDataWithUnsupportedType.DATA_FILE_PATH));
206+
Assertions.assertEquals(
207+
"ErrorCode:[COMMON-20], ErrorDescription:['Parquet' table 'default.default.default' unsupported get catalog table with field data types"
208+
+ " '{\"id\":\"required group id (LIST) {\\n repeated group array (LIST) {\\n repeated binary array;\\n }\\n}\",\"id2\":\"required group id2 (LIST) {\\n repeated group array (LIST)"
209+
+ " {\\n repeated binary array;\\n }\\n}\"}']",
210+
exception.getMessage());
211+
AutoGenerateParquetData.deleteFile();
212+
}
213+
192214
public static class TestCollector implements Collector<SeaTunnelRow> {
193215

194216
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -281,4 +303,45 @@ public static void deleteFile() {
281303
}
282304
}
283305
}
306+
307+
public static class AutoGenerateParquetDataWithUnsupportedType {
308+
309+
public static final String DATA_FILE_PATH = "/tmp/data_unsupported.parquet";
310+
311+
public static void generateTestData() throws IOException {
312+
deleteFile();
313+
String schemaString =
314+
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\": \"array\", \"items\": {\"type\": \"array\", \"items\": \"bytes\"}}},{\"name\":\"id2\",\"type\":{\"type\": \"array\", \"items\": {\"type\": \"array\", \"items\": \"bytes\"}}},{\"name\":\"long\",\"type\":\"long\"}]}";
315+
Schema schema = new Schema.Parser().parse(schemaString);
316+
317+
Configuration conf = new Configuration();
318+
319+
Path file = new Path(DATA_FILE_PATH);
320+
321+
ParquetWriter<GenericRecord> writer =
322+
AvroParquetWriter.<GenericRecord>builder(file)
323+
.withSchema(schema)
324+
.withConf(conf)
325+
.withCompressionCodec(CompressionCodecName.SNAPPY)
326+
.build();
327+
328+
GenericRecord record1 = new GenericData.Record(schema);
329+
GenericArray<GenericData.Array<Utf8>> id =
330+
new GenericData.Array<>(2, schema.getField("id").schema());
331+
id.add(new GenericData.Array<>(2, schema.getField("id").schema().getElementType()));
332+
id.add(new GenericData.Array<>(2, schema.getField("id").schema().getElementType()));
333+
record1.put("id", id);
334+
record1.put("id2", id);
335+
record1.put("long", Long.MAX_VALUE);
336+
writer.write(record1);
337+
writer.close();
338+
}
339+
340+
public static void deleteFile() {
341+
File parquetFile = new File(DATA_FILE_PATH);
342+
if (parquetFile.exists()) {
343+
parquetFile.delete();
344+
}
345+
}
346+
}
284347
}

0 commit comments

Comments
 (0)