Skip to content

Commit 3e64a42

Browse files
litiliulitiliu
and
litiliu
authored
[Fix][File]use common-csv to read csv file (#8919)
Co-authored-by: litiliu <[email protected]>
1 parent 0ba4fd8 commit 3e64a42

File tree

5 files changed

+124
-52
lines changed

5 files changed

+124
-52
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java

+55-46
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
22-
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2322
import org.apache.seatunnel.api.source.Collector;
2423
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2524
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -39,6 +38,10 @@
3938
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
4039
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
4140

41+
import org.apache.commons.csv.CSVFormat;
42+
import org.apache.commons.csv.CSVParser;
43+
import org.apache.commons.csv.CSVRecord;
44+
4245
import io.airlift.compress.lzo.LzopCodec;
4346
import lombok.extern.slf4j.Slf4j;
4447

@@ -47,12 +50,13 @@
4750
import java.io.InputStream;
4851
import java.io.InputStreamReader;
4952
import java.nio.charset.StandardCharsets;
53+
import java.util.HashMap;
5054
import java.util.Map;
5155
import java.util.Optional;
5256

5357
@Slf4j
5458
public class CsvReadStrategy extends AbstractReadStrategy {
55-
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
59+
private CsvDeserializationSchema deserializationSchema;
5660
private String fieldDelimiter = BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();
5761
private DateUtils.Formatter dateFormat = BaseSourceConfigOptions.DATE_FORMAT.defaultValue();
5862
private DateTimeUtils.Formatter datetimeFormat =
@@ -62,6 +66,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
6266
private CsvLineProcessor processor;
6367
private int[] indexes;
6468
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
69+
private CatalogTable inputCatalogTable;
6570

6671
@Override
6772
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
@@ -96,51 +101,54 @@ public void readProcess(
96101
break;
97102
}
98103

104+
CSVFormat csvFormat = CSVFormat.DEFAULT;
99105
try (BufferedReader reader =
100-
new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
101-
reader.lines()
102-
.skip(skipHeaderNumber)
103-
.forEach(
104-
line -> {
105-
try {
106-
SeaTunnelRow seaTunnelRow =
107-
deserializationSchema.deserialize(
108-
line.getBytes(StandardCharsets.UTF_8));
109-
if (!readColumns.isEmpty()) {
110-
// need column projection
111-
Object[] fields;
112-
if (isMergePartition) {
113-
fields =
114-
new Object
115-
[readColumns.size()
116-
+ partitionsMap.size()];
117-
} else {
118-
fields = new Object[readColumns.size()];
119-
}
120-
for (int i = 0; i < indexes.length; i++) {
121-
fields[i] = seaTunnelRow.getField(indexes[i]);
122-
}
123-
seaTunnelRow = new SeaTunnelRow(fields);
124-
}
125-
if (isMergePartition) {
126-
int index = seaTunnelRowType.getTotalFields();
127-
for (String value : partitionsMap.values()) {
128-
seaTunnelRow.setField(index++, value);
129-
}
130-
}
131-
seaTunnelRow.setTableId(tableId);
132-
output.collect(seaTunnelRow);
133-
} catch (IOException e) {
134-
String errorMsg =
135-
String.format(
136-
"Deserialize this data [%s] failed, please check the origin data",
137-
line);
138-
throw new FileConnectorException(
139-
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
140-
errorMsg,
141-
e);
142-
}
143-
});
106+
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
107+
CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
108+
for (int i = 0; i < skipHeaderNumber; i++) {
109+
if (reader.readLine() == null) {
110+
throw new IOException(
111+
String.format(
112+
"File [%s] has fewer lines than expected to skip.",
113+
currentFileName));
114+
}
115+
}
116+
// read lines
117+
for (CSVRecord csvRecord : csvParser) {
118+
HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
119+
for (int i = 0; i < inputCatalogTable.getTableSchema().getColumns().size(); i++) {
120+
fieldIdValueMap.put(i, csvRecord.get(i));
121+
}
122+
SeaTunnelRow seaTunnelRow = deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
123+
if (!readColumns.isEmpty()) {
124+
// need column projection
125+
Object[] fields;
126+
if (isMergePartition) {
127+
fields = new Object[readColumns.size() + partitionsMap.size()];
128+
} else {
129+
fields = new Object[readColumns.size()];
130+
}
131+
for (int i = 0; i < indexes.length; i++) {
132+
fields[i] = seaTunnelRow.getField(indexes[i]);
133+
}
134+
seaTunnelRow = new SeaTunnelRow(fields);
135+
}
136+
if (isMergePartition) {
137+
int index = seaTunnelRowType.getTotalFields();
138+
for (String value : partitionsMap.values()) {
139+
seaTunnelRow.setField(index++, value);
140+
}
141+
}
142+
seaTunnelRow.setTableId(tableId);
143+
output.collect(seaTunnelRow);
144+
}
145+
} catch (IOException e) {
146+
String errorMsg =
147+
String.format(
148+
"Deserialize this file [%s] failed, please check the origin data",
149+
currentFileName);
150+
throw new FileConnectorException(
151+
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e);
144152
}
145153
}
146154

@@ -177,6 +185,7 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
177185
@Override
178186
public void setCatalogTable(CatalogTable catalogTable) {
179187
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
188+
this.inputCatalogTable = catalogTable;
180189
SeaTunnelRowType userDefinedRowTypeWithPartition =
181190
mergePartitionTypes(fileNames.get(0), rowType);
182191
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ public class LocalFileIT extends TestSuiteBase {
286286
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
287287
container);
288288

289+
ContainerUtil.copyFileIntoContainers(
290+
"/csv/break_line.csv",
291+
"/seatunnel/read/csv/break_line/break_line.csv",
292+
container);
293+
289294
ContainerUtil.copyFileIntoContainers(
290295
"/text/e2e_null_format.txt",
291296
"/seatunnel/read/e2e_null_format/e2e_null_format.txt",
@@ -300,6 +305,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
300305
TestHelper helper = new TestHelper(container);
301306
helper.execute("/csv/fake_to_local_csv.conf");
302307
helper.execute("/csv/local_csv_to_assert.conf");
308+
helper.execute("/csv/breakline_csv_to_assert.conf");
303309
helper.execute("/excel/fake_to_local_excel.conf");
304310
helper.execute("/excel/local_excel_to_assert.conf");
305311
helper.execute("/excel/local_excel_projection_to_assert.conf");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
20,"harry
2+
potter"
3+
21,"tom"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
LocalFile {
25+
path = "/seatunnel/read/csv/break_line"
26+
file_format_type = csv
27+
schema = {
28+
fields {
29+
age = int
30+
name = string
31+
}
32+
}
33+
}
34+
}
35+
36+
sink {
37+
Assert {
38+
rules {
39+
row_rules = [
40+
{
41+
rule_type = MAX_ROW
42+
rule_value = 2
43+
}
44+
{
45+
rule_type = MIN_ROW
46+
rule_value = 2
47+
}
48+
]
49+
}
50+
}
51+
}

Diff for: seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.seatunnel.format.csv;
1919

20-
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2120
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2221
import org.apache.seatunnel.api.table.catalog.TablePath;
2322
import org.apache.seatunnel.api.table.type.ArrayType;
@@ -41,6 +40,7 @@
4140
import lombok.NonNull;
4241

4342
import java.io.IOException;
43+
import java.io.Serializable;
4444
import java.math.BigDecimal;
4545
import java.nio.charset.StandardCharsets;
4646
import java.time.LocalDate;
@@ -58,7 +58,7 @@
5858
import java.util.Map;
5959
import java.util.Optional;
6060

61-
public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
61+
public class CsvDeserializationSchema implements Serializable {
6262
private final SeaTunnelRowType seaTunnelRowType;
6363
private final String[] separators;
6464
private final String encoding;
@@ -169,13 +169,17 @@ public CsvDeserializationSchema build() {
169169
}
170170
}
171171

172-
@Override
173-
public SeaTunnelRow deserialize(byte[] message) throws IOException {
172+
protected SeaTunnelRow deserialize(byte[] message) throws IOException {
174173
if (message == null || message.length == 0) {
175174
return null;
176175
}
177176
String content = new String(message, EncodingUtils.tryParseCharset(encoding));
178177
Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType, 0);
178+
SeaTunnelRow seaTunnelRow = getSeaTunnelRow(splitsMap);
179+
return seaTunnelRow;
180+
}
181+
182+
public SeaTunnelRow getSeaTunnelRow(Map<Integer, String> splitsMap) {
179183
Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
180184
for (int i = 0; i < objects.length; i++) {
181185
String fieldValue = splitsMap.get(i);
@@ -201,12 +205,11 @@ public SeaTunnelRow deserialize(byte[] message) throws IOException {
201205
return seaTunnelRow;
202206
}
203207

204-
@Override
205208
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
206209
return seaTunnelRowType;
207210
}
208211

209-
public Map<Integer, String> splitLineBySeaTunnelRowType(
212+
protected Map<Integer, String> splitLineBySeaTunnelRowType(
210213
String line, SeaTunnelRowType seaTunnelRowType, int level) {
211214
String[] splits = processor.splitLine(line, separators[level]);
212215
LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();

0 commit comments

Comments
 (0)