Skip to content

Commit 17e2918

Browse files
authored
[Bug] [formats] Fix fail to parse line when content contains the file delimiter (#6589)
1 parent 1b5e766 commit 17e2918

File tree

6 files changed

+308
-5
lines changed

6 files changed

+308
-5
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3636
import org.apache.seatunnel.format.text.TextDeserializationSchema;
3737
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
38+
import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
39+
import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
40+
import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
3841

3942
import io.airlift.compress.lzo.LzopCodec;
4043
import lombok.extern.slf4j.Slf4j;
@@ -56,6 +59,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
5659
BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();
5760
private TimeUtils.Formatter timeFormat = BaseSourceConfigOptions.TIME_FORMAT.defaultValue();
5861
private CompressFormat compressFormat = BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();
62+
private TextLineSplitor textLineSplitor;
5963
private int[] indexes;
6064
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
6165

@@ -145,7 +149,8 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
145149
.delimiter(TextFormatConstant.PLACEHOLDER)
146150
.dateFormatter(dateFormat)
147151
.dateTimeFormatter(datetimeFormat)
148-
.timeFormatter(timeFormat);
152+
.timeFormatter(timeFormat)
153+
.textLineSplitor(textLineSplitor);
149154
if (isMergePartition) {
150155
deserializationSchema =
151156
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
@@ -184,7 +189,8 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
184189
.delimiter(fieldDelimiter)
185190
.dateFormatter(dateFormat)
186191
.dateTimeFormatter(datetimeFormat)
187-
.timeFormatter(timeFormat);
192+
.timeFormatter(timeFormat)
193+
.textLineSplitor(textLineSplitor);
188194
if (isMergePartition) {
189195
deserializationSchema =
190196
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
@@ -232,5 +238,14 @@ private void initFormatter() {
232238
pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());
233239
compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
234240
}
241+
if (FileFormat.CSV.equals(
242+
FileFormat.valueOf(
243+
pluginConfig
244+
.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key())
245+
.toUpperCase()))) {
246+
textLineSplitor = new CsvLineSplitor();
247+
} else {
248+
textLineSplitor = new DefaultTextLineSplitor();
249+
}
235250
}
236251
}

Diff for: seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.seatunnel.common.utils.TimeUtils;
3232
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
3333
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
34+
import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
35+
import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
3436

3537
import org.apache.commons.lang3.StringUtils;
3638

@@ -50,20 +52,23 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
5052
private final DateTimeUtils.Formatter dateTimeFormatter;
5153
private final TimeUtils.Formatter timeFormatter;
5254
private final String encoding;
55+
private final TextLineSplitor splitor;
5356

5457
private TextDeserializationSchema(
5558
@NonNull SeaTunnelRowType seaTunnelRowType,
5659
String[] separators,
5760
DateUtils.Formatter dateFormatter,
5861
DateTimeUtils.Formatter dateTimeFormatter,
5962
TimeUtils.Formatter timeFormatter,
60-
String encoding) {
63+
String encoding,
64+
TextLineSplitor splitor) {
6165
this.seaTunnelRowType = seaTunnelRowType;
6266
this.separators = separators;
6367
this.dateFormatter = dateFormatter;
6468
this.dateTimeFormatter = dateTimeFormatter;
6569
this.timeFormatter = timeFormatter;
6670
this.encoding = encoding;
71+
this.splitor = splitor;
6772
}
6873

6974
public static Builder builder() {
@@ -78,6 +83,7 @@ public static class Builder {
7883
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
7984
private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;
8085
private String encoding = StandardCharsets.UTF_8.name();
86+
private TextLineSplitor textLineSplitor = new DefaultTextLineSplitor();
8187

8288
private Builder() {}
8389

@@ -116,14 +122,20 @@ public Builder encoding(String encoding) {
116122
return this;
117123
}
118124

125+
public Builder textLineSplitor(TextLineSplitor splitor) {
126+
this.textLineSplitor = splitor;
127+
return this;
128+
}
129+
119130
public TextDeserializationSchema build() {
120131
return new TextDeserializationSchema(
121132
seaTunnelRowType,
122133
separators,
123134
dateFormatter,
124135
dateTimeFormatter,
125136
timeFormatter,
126-
encoding);
137+
encoding,
138+
textLineSplitor);
127139
}
128140
}
129141

@@ -145,7 +157,7 @@ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
145157

146158
private Map<Integer, String> splitLineBySeaTunnelRowType(
147159
String line, SeaTunnelRowType seaTunnelRowType, int level) {
148-
String[] splits = line.split(separators[level], -1);
160+
String[] splits = splitor.spliteLine(line, separators[level]);
149161
LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
150162
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
151163
for (int i = 0; i < splits.length; i++) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.format.text.splitor;
19+
20+
import org.apache.seatunnel.common.utils.ExceptionUtils;
21+
22+
import org.apache.commons.csv.CSVFormat;
23+
import org.apache.commons.csv.CSVParser;
24+
import org.apache.commons.csv.CSVRecord;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
import java.io.IOException;
29+
import java.io.Serializable;
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Objects;
35+
36+
@Slf4j
37+
public class CsvLineSplitor implements TextLineSplitor, Serializable {
38+
private Map<Character, CSVFormat> splitorFormatMap = new HashMap<>();
39+
40+
@Override
41+
public String[] spliteLine(String line, String splitor) {
42+
Character splitChar = splitor.charAt(0);
43+
if (Objects.isNull(splitorFormatMap.get(splitChar))) {
44+
splitorFormatMap.put(splitChar, CSVFormat.DEFAULT.withDelimiter(splitChar));
45+
}
46+
CSVFormat format = splitorFormatMap.get(splitChar);
47+
CSVParser parser = null;
48+
// Method to parse the line into CSV with the given separator
49+
try {
50+
// Create CSV parser
51+
parser = CSVParser.parse(line, format);
52+
// Parse the CSV records
53+
List<String> res = new ArrayList<>();
54+
for (CSVRecord record : parser.getRecords()) {
55+
for (String value : record) {
56+
res.add(value);
57+
}
58+
}
59+
return res.toArray(new String[0]);
60+
} catch (Exception e) {
61+
log.error(ExceptionUtils.getMessage(e));
62+
return new String[0];
63+
} finally {
64+
if (Objects.nonNull(parser)) {
65+
try {
66+
parser.close();
67+
} catch (IOException e) {
68+
log.error(ExceptionUtils.getMessage(e));
69+
}
70+
}
71+
}
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.format.text.splitor;
19+
20+
import java.io.Serializable;
21+
22+
public class DefaultTextLineSplitor implements TextLineSplitor, Serializable {
23+
24+
@Override
25+
public String[] spliteLine(String line, String seperator) {
26+
return line.split(seperator, -1);
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.format.text.splitor;
19+
20+
public interface TextLineSplitor {
21+
String[] spliteLine(String line, String splitor);
22+
}

0 commit comments

Comments
 (0)