Skip to content

Commit 6e0d630

Browse files
litiliulitiliuhailin0
authored
[Fix][Connector-File] Fix conflicting file_format_type requirement (#8823)
Co-authored-by: litiliu <[email protected]> Co-authored-by: hailin0 <[email protected]>
1 parent 373d216 commit 6e0d630

File tree

4 files changed

+48
-10
lines changed

4 files changed

+48
-10
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
4646
protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
4747
protected boolean createEmptyFileWhenNoData =
4848
BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
49-
protected FileFormat fileFormat = FileFormat.TEXT;
49+
protected FileFormat fileFormat;
5050
protected String filenameExtension = BaseSinkConfig.FILENAME_EXTENSION.defaultValue();
5151
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
5252
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
@@ -101,6 +101,9 @@ public BaseFileSinkConfig(@NonNull Config config) {
101101
FileFormat.valueOf(
102102
config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
103103
.toUpperCase(Locale.ROOT));
104+
} else {
105+
// fall back to the default
106+
this.fileFormat = BaseSinkConfig.FILE_FORMAT_TYPE.defaultValue();
104107
}
105108

106109
if (config.hasPath(BaseSinkConfig.FILENAME_EXTENSION.key())

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,7 @@ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunne
209209
this.sheetName = config.getString(BaseSinkConfig.SHEET_NAME.key());
210210
}
211211

212-
if (FileFormat.XML
213-
.name()
214-
.equalsIgnoreCase(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
212+
if (FileFormat.XML.equals(this.fileFormat)) {
215213
if (!config.hasPath(BaseSinkConfig.XML_USE_ATTR_FORMAT.key())) {
216214
throw new FileConnectorException(
217215
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
@@ -229,9 +227,7 @@ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunne
229227
}
230228
}
231229

232-
if (FileFormat.PARQUET
233-
.name()
234-
.equalsIgnoreCase(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
230+
if (FileFormat.PARQUET.equals(this.fileFormat)) {
235231
if (config.hasPath(BaseSinkConfig.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key())) {
236232
this.parquetWriteTimestampAsInt96 =
237233
config.getBoolean(
@@ -244,9 +240,7 @@ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunne
244240
}
245241
}
246242

247-
if (FileFormat.CSV
248-
.name()
249-
.equalsIgnoreCase(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
243+
if (FileFormat.CSV.equals(this.fileFormat)) {
250244
if (config.hasPath(BaseSinkConfig.CSV_STRING_QUOTE_MODE.key())) {
251245
this.csvStringQuoteMode =
252246
CsvStringQuoteMode.valueOf(

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ public void testConfigInit() throws Exception {
4949
Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config, rowType));
5050
}
5151

52+
@Test
53+
public void testConfigInitDefault() throws Exception {
54+
URL conf = OrcReadStrategyTest.class.getResource("/test_write_hdfs_default_format.conf");
55+
Assertions.assertNotNull(conf);
56+
String confPath = Paths.get(conf.toURI()).toString();
57+
Config config = ConfigFactory.parseFile(new File(confPath));
58+
59+
SeaTunnelRowType rowType =
60+
new SeaTunnelRowType(
61+
new String[] {"data", "ts"},
62+
new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE});
63+
Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config, rowType));
64+
}
65+
5266
@Test
5367
public void testSinkColumnsGreaterThanSource() throws Exception {
5468
URL conf = OrcReadStrategyTest.class.getResource("/test_write_hive.conf");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
{
19+
fs.defaultFS = "hdfs://hadoop01:9000"
20+
have_partition = true
21+
partition_by = ["ts"]
22+
partition_dir_expression = "${v0}"
23+
is_partition_field_write_in_file = false
24+
path = "/data/test"
25+
batch_size=10
26+
}
27+

0 commit comments

Comments
 (0)