Skip to content

Commit 25b8a02

Browse files
authored
[Bug] [connector-file] When the data source field is less than the target (Hive) field,it will throw null pointer exception#8150 (#8200)
1 parent 9a9917d commit 25b8a02

File tree

3 files changed

+46
-0
lines changed

3 files changed

+46
-0
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunne
187187
this.sinkColumnsIndexInRow =
188188
this.sinkColumnList.stream()
189189
.map(column -> columnsMap.get(column.toLowerCase()))
190+
.filter(e -> e != null)
190191
.collect(Collectors.toList());
191192

192193
if (!CollectionUtils.isEmpty(this.partitionFieldList)) {

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.io.File;
3232
import java.net.URL;
3333
import java.nio.file.Paths;
34+
import java.util.List;
3435

3536
public class FileSinkConfigTest {
3637

@@ -47,4 +48,23 @@ public void testConfigInit() throws Exception {
4748
new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE});
4849
Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config, rowType));
4950
}
51+
52+
@Test
53+
public void testSinkColumnsGreaterThanSource() throws Exception {
54+
URL conf = OrcReadStrategyTest.class.getResource("/test_write_hive.conf");
55+
Assertions.assertNotNull(conf);
56+
String confPath = Paths.get(conf.toURI()).toString();
57+
Config config = ConfigFactory.parseFile(new File(confPath));
58+
59+
SeaTunnelRowType seaTunnelRowTypeInfo =
60+
new SeaTunnelRowType(
61+
new String[] {"name", "age", "address"},
62+
new SeaTunnelDataType[] {
63+
BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.STRING_TYPE
64+
});
65+
FileSinkConfig fileSinkConfig = new FileSinkConfig(config, seaTunnelRowTypeInfo);
66+
List<Integer> sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
67+
Assertions.assertEquals(
68+
sinkColumnsIndexInRow.size(), seaTunnelRowTypeInfo.getFieldNames().length);
69+
}
5070
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
{
19+
fs.defaultFS = "hdfs://hadoop01:9000"
20+
path = "/data/test"
21+
file_format_type = "json"
22+
batch_size=10
23+
sink_columns=[name,age,address,weight,height]
24+
}
25+

0 commit comments

Comments
 (0)