Skip to content

Commit cad54ca

Browse files
hailin0jia zhang
authored and
jia zhang
committed
[Bugfix][Csv] Fix csv format delimiter (apache#9066)
1 parent 6a01266 commit cad54ca

File tree

5 files changed

+229
-11
lines changed

5 files changed

+229
-11
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/CsvWriteStrategy.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
public class CsvWriteStrategy extends AbstractWriteStrategy<FSDataOutputStream> {
4848
private final LinkedHashMap<String, FSDataOutputStream> beingWrittenOutputStream;
4949
private final Map<String, Boolean> isFirstWrite;
50-
private final String fieldDelimiter;
5150
private final String rowDelimiter;
5251
private final DateUtils.Formatter dateFormat;
5352
private final DateTimeUtils.Formatter dateTimeFormat;
@@ -63,7 +62,6 @@ public CsvWriteStrategy(FileSinkConfig fileSinkConfig) {
6362
this.csvStringQuoteMode = fileSinkConfig.getCsvStringQuoteMode();
6463
this.beingWrittenOutputStream = new LinkedHashMap<>();
6564
this.isFirstWrite = new HashMap<>();
66-
this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
6765
this.rowDelimiter = fileSinkConfig.getRowDelimiter();
6866
this.dateFormat = fileSinkConfig.getDateFormat();
6967
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
@@ -81,7 +79,7 @@ public void setCatalogTable(CatalogTable catalogTable) {
8179
.seaTunnelRowType(
8280
buildSchemaWithRowType(
8381
catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow))
84-
.delimiter(fieldDelimiter)
82+
.delimiter(",")
8583
.dateFormatter(dateFormat)
8684
.dateTimeFormatter(dateTimeFormat)
8785
.timeFormatter(timeFormat)

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
3535
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3636
import org.apache.seatunnel.format.csv.CsvDeserializationSchema;
37-
import org.apache.seatunnel.format.csv.constant.CsvFormatConstant;
3837
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
3938
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
4039

@@ -53,13 +52,11 @@
5352
import java.util.HashMap;
5453
import java.util.List;
5554
import java.util.Map;
56-
import java.util.Optional;
5755
import java.util.stream.Collectors;
5856

5957
@Slf4j
6058
public class CsvReadStrategy extends AbstractReadStrategy {
6159
private CsvDeserializationSchema deserializationSchema;
62-
private String fieldDelimiter = FileBaseSourceOptions.FIELD_DELIMITER.defaultValue();
6360
private DateUtils.Formatter dateFormat = FileBaseSourceOptions.DATE_FORMAT.defaultValue();
6461
private DateTimeUtils.Formatter datetimeFormat =
6562
FileBaseSourceOptions.DATETIME_FORMAT.defaultValue();
@@ -201,7 +198,7 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
201198
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
202199
CsvDeserializationSchema.Builder builder =
203200
CsvDeserializationSchema.builder()
204-
.delimiter(CsvFormatConstant.PLACEHOLDER)
201+
.delimiter(",")
205202
.csvLineProcessor(processor)
206203
.nullFormat(
207204
readonlyConfig
@@ -223,17 +220,14 @@ public void setCatalogTable(CatalogTable catalogTable) {
223220
SeaTunnelRowType userDefinedRowTypeWithPartition =
224221
mergePartitionTypes(fileNames.get(0), rowType);
225222
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
226-
Optional<String> fieldDelimiterOptional =
227-
readonlyConfig.getOptional(FileBaseSourceOptions.FIELD_DELIMITER);
228223
encoding =
229224
readonlyConfig
230225
.getOptional(FileBaseSourceOptions.ENCODING)
231226
.orElse(StandardCharsets.UTF_8.name());
232-
fieldDelimiter = ",";
233227
initFormatter();
234228
CsvDeserializationSchema.Builder builder =
235229
CsvDeserializationSchema.builder()
236-
.delimiter(fieldDelimiter)
230+
.delimiter(",")
237231
.csvLineProcessor(processor)
238232
.nullFormat(
239233
readonlyConfig
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
21+
22+
import org.apache.seatunnel.api.source.Collector;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
24+
import org.apache.seatunnel.api.table.type.BasicType;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
29+
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
import java.net.URL;
36+
import java.nio.file.Paths;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
41+
42+
@Slf4j
43+
public class CsvReadStrategyTest {
44+
45+
@Test
46+
public void testReadCsv() throws Exception {
47+
URL resource = CsvReadStrategyTest.class.getResource("/test.csv");
48+
String path = Paths.get(resource.toURI()).toString();
49+
CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
50+
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
51+
csvReadStrategy.init(localConf);
52+
csvReadStrategy.getFileNamesByPath(path);
53+
csvReadStrategy.setPluginConfig(ConfigFactory.empty());
54+
csvReadStrategy.setCatalogTable(
55+
CatalogTableUtil.getCatalogTable(
56+
"test",
57+
new SeaTunnelRowType(
58+
new String[] {"id", "name", "age"},
59+
new SeaTunnelDataType[] {
60+
BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE
61+
})));
62+
TestCollector testCollector = new TestCollector();
63+
csvReadStrategy.read(path, "", testCollector);
64+
65+
Assertions.assertEquals(2, testCollector.getRows().size());
66+
Assertions.assertEquals(1, testCollector.getRows().get(0).getField(0));
67+
Assertions.assertEquals("a", testCollector.getRows().get(0).getField(1));
68+
Assertions.assertEquals(10, testCollector.getRows().get(0).getField(2));
69+
Assertions.assertEquals(2, testCollector.getRows().get(1).getField(0));
70+
Assertions.assertEquals("b", testCollector.getRows().get(1).getField(1));
71+
Assertions.assertEquals(100, testCollector.getRows().get(1).getField(2));
72+
}
73+
74+
public static class TestCollector implements Collector<SeaTunnelRow> {
75+
76+
private final List<SeaTunnelRow> rows = new ArrayList<>();
77+
78+
public List<SeaTunnelRow> getRows() {
79+
return rows;
80+
}
81+
82+
@Override
83+
public void collect(SeaTunnelRow record) {
84+
log.info(record.toString());
85+
rows.add(record);
86+
}
87+
88+
@Override
89+
public Object getCheckpointLock() {
90+
return null;
91+
}
92+
}
93+
94+
public static class LocalConf extends HadoopConf {
95+
private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem";
96+
private static final String SCHEMA = "file";
97+
98+
public LocalConf(String hdfsNameKey) {
99+
super(hdfsNameKey);
100+
}
101+
102+
@Override
103+
public String getFsHdfsImpl() {
104+
return HDFS_IMPL;
105+
}
106+
107+
@Override
108+
public String getSchema() {
109+
return SCHEMA;
110+
}
111+
}
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.writer;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
21+
22+
import org.apache.seatunnel.api.source.Collector;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
24+
import org.apache.seatunnel.api.table.type.BasicType;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
29+
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
30+
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.CsvWriteStrategy;
31+
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
32+
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.condition.DisabledOnOs;
36+
import org.junit.jupiter.api.condition.OS;
37+
38+
import lombok.extern.slf4j.Slf4j;
39+
40+
import java.util.ArrayList;
41+
import java.util.HashMap;
42+
import java.util.List;
43+
import java.util.Map;
44+
45+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
46+
47+
@Slf4j
48+
public class CsvWriteStrategyTest {
49+
private static final String TMP_PATH = "file:///tmp/seatunnel/csv/test";
50+
51+
@DisabledOnOs(OS.WINDOWS)
52+
@Test
53+
public void testParquetWriteInt96() throws Exception {
54+
Map<String, Object> writeConfig = new HashMap<>();
55+
writeConfig.put("tmp_path", TMP_PATH);
56+
writeConfig.put("path", "file:///tmp/seatunnel/csv/int96");
57+
writeConfig.put("file_format_type", FileFormat.CSV.name());
58+
59+
SeaTunnelRowType writeRowType =
60+
new SeaTunnelRowType(
61+
new String[] {"id", "name", "age"},
62+
new SeaTunnelDataType[] {
63+
BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE
64+
});
65+
FileSinkConfig writeSinkConfig =
66+
new FileSinkConfig(ConfigFactory.parseMap(writeConfig), writeRowType);
67+
CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
68+
ParquetReadStrategyTest.LocalConf hadoopConf =
69+
new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
70+
writeStrategy.setCatalogTable(
71+
CatalogTableUtil.getCatalogTable("test", null, null, "test", writeRowType));
72+
writeStrategy.init(hadoopConf, "test1", "test1", 0);
73+
writeStrategy.beginTransaction(1L);
74+
writeStrategy.write(new SeaTunnelRow(new Object[] {1, "a", 20}));
75+
writeStrategy.finishAndCloseFile();
76+
writeStrategy.close();
77+
78+
CsvReadStrategy readStrategy = new CsvReadStrategy();
79+
readStrategy.init(hadoopConf);
80+
List<String> readFiles = readStrategy.getFileNamesByPath(TMP_PATH);
81+
readStrategy.setPluginConfig(ConfigFactory.empty());
82+
readStrategy.setCatalogTable(
83+
CatalogTableUtil.getCatalogTable(
84+
"test",
85+
new SeaTunnelRowType(
86+
new String[] {"id", "name", "age"},
87+
new SeaTunnelDataType[] {
88+
BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE
89+
})));
90+
Assertions.assertEquals(1, readFiles.size());
91+
String readFilePath = readFiles.get(0);
92+
List<SeaTunnelRow> readRows = new ArrayList<>();
93+
Collector<SeaTunnelRow> readCollector =
94+
new Collector<SeaTunnelRow>() {
95+
@Override
96+
public void collect(SeaTunnelRow record) {
97+
Assertions.assertEquals(1, record.getField(0));
98+
Assertions.assertEquals("a", record.getField(1));
99+
Assertions.assertEquals(20, record.getField(2));
100+
readRows.add(record);
101+
}
102+
103+
@Override
104+
public Object getCheckpointLock() {
105+
return null;
106+
}
107+
};
108+
readStrategy.read(readFilePath, "test", readCollector);
109+
Assertions.assertEquals(1, readRows.size());
110+
readStrategy.close();
111+
}
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1,a,10
2+
2,b,100

0 commit comments

Comments
 (0)