Skip to content

Commit 31ead24

Browse files
committed
[Bug] Fix OrcWriteStrategy/ParquetWriteStrategy doesn't login with kerberos
1 parent 0100bda commit 31ead24

File tree

6 files changed

+66
-94
lines changed

6 files changed

+66
-94
lines changed

Diff for: release-note.md

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
5353
- [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
5454
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
55+
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
5556

5657
### Zeta(ST-Engine)
5758

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.config;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.CommonConfigurationKeys;
2122
import org.apache.hadoop.fs.Path;
2223

2324
import lombok.Data;
@@ -26,6 +27,11 @@
2627
import java.util.HashMap;
2728
import java.util.Map;
2829

30+
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
31+
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
32+
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
33+
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
34+
2935
@Data
3036
public class HadoopConf implements Serializable {
3137
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
@@ -60,4 +66,16 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
6066
configuration.addResource(new Path(hdfsSitePath));
6167
}
6268
}
69+
70+
public Configuration toConfiguration() {
71+
Configuration configuration = new Configuration();
72+
configuration.setBoolean(READ_INT96_AS_FIXED, true);
73+
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
74+
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
75+
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
76+
configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
77+
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
78+
configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
79+
return configuration;
80+
}
6381
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java

+18-61
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.commons.lang3.StringUtils;
2424
import org.apache.commons.lang3.tuple.Pair;
2525
import org.apache.hadoop.conf.Configuration;
26-
import org.apache.hadoop.fs.CommonConfigurationKeys;
2726
import org.apache.hadoop.fs.FSDataInputStream;
2827
import org.apache.hadoop.fs.FSDataOutputStream;
2928
import org.apache.hadoop.fs.FileStatus;
@@ -43,11 +42,6 @@
4342
import java.util.ArrayList;
4443
import java.util.List;
4544

46-
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
47-
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
48-
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
49-
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
50-
5145
@Slf4j
5246
public class HadoopFileSystemProxy implements Serializable, Closeable {
5347

@@ -64,30 +58,19 @@ public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) {
6458
}
6559

6660
public boolean fileExist(@NonNull String filePath) throws IOException {
67-
if (fileSystem == null) {
68-
initialize();
69-
}
70-
Path fileName = new Path(filePath);
71-
return fileSystem.exists(fileName);
61+
return getFileSystem().exists(new Path(filePath));
7262
}
7363

7464
public void createFile(@NonNull String filePath) throws IOException {
75-
if (fileSystem == null) {
76-
initialize();
77-
}
78-
Path path = new Path(filePath);
79-
if (!fileSystem.createNewFile(path)) {
65+
if (!getFileSystem().createNewFile(new Path(filePath))) {
8066
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
8167
}
8268
}
8369

8470
public void deleteFile(@NonNull String filePath) throws IOException {
85-
if (fileSystem == null) {
86-
initialize();
87-
}
8871
Path path = new Path(filePath);
89-
if (fileSystem.exists(path)) {
90-
if (!fileSystem.delete(path, true)) {
72+
if (getFileSystem().exists(path)) {
73+
if (!getFileSystem().delete(path, true)) {
9174
throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath);
9275
}
9376
}
@@ -98,9 +81,6 @@ public void renameFile(
9881
@NonNull String newFilePath,
9982
boolean removeWhenNewFilePathExist)
10083
throws IOException {
101-
if (fileSystem == null) {
102-
initialize();
103-
}
10484
Path oldPath = new Path(oldFilePath);
10585
Path newPath = new Path(newFilePath);
10686

@@ -116,15 +96,15 @@ public void renameFile(
11696

11797
if (removeWhenNewFilePathExist) {
11898
if (fileExist(newFilePath)) {
119-
fileSystem.delete(newPath, true);
99+
getFileSystem().delete(newPath, true);
120100
log.info("Delete already file: {}", newPath);
121101
}
122102
}
123103
if (!fileExist(newPath.getParent().toString())) {
124104
createDir(newPath.getParent().toString());
125105
}
126106

127-
if (fileSystem.rename(oldPath, newPath)) {
107+
if (getFileSystem().rename(oldPath, newPath)) {
128108
log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
129109
} else {
130110
throw CommonError.fileOperationFailed(
@@ -133,42 +113,33 @@ public void renameFile(
133113
}
134114

135115
public void createDir(@NonNull String filePath) throws IOException {
136-
if (fileSystem == null) {
137-
initialize();
138-
}
139116
Path dfs = new Path(filePath);
140-
if (!fileSystem.mkdirs(dfs)) {
117+
if (!getFileSystem().mkdirs(dfs)) {
141118
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
142119
}
143120
}
144121

145122
public List<LocatedFileStatus> listFile(String path) throws IOException {
146-
if (fileSystem == null) {
147-
initialize();
148-
}
149123
List<LocatedFileStatus> fileList = new ArrayList<>();
150124
if (!fileExist(path)) {
151125
return fileList;
152126
}
153127
Path fileName = new Path(path);
154128
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
155-
fileSystem.listFiles(fileName, false);
129+
getFileSystem().listFiles(fileName, false);
156130
while (locatedFileStatusRemoteIterator.hasNext()) {
157131
fileList.add(locatedFileStatusRemoteIterator.next());
158132
}
159133
return fileList;
160134
}
161135

162136
public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
163-
if (fileSystem == null) {
164-
initialize();
165-
}
166137
List<Path> pathList = new ArrayList<>();
167138
if (!fileExist(filePath)) {
168139
return pathList;
169140
}
170141
Path fileName = new Path(filePath);
171-
FileStatus[] status = fileSystem.listStatus(fileName);
142+
FileStatus[] status = getFileSystem().listStatus(fileName);
172143
if (status != null) {
173144
for (FileStatus fileStatus : status) {
174145
if (fileStatus.isDirectory()) {
@@ -180,31 +151,26 @@ public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
180151
}
181152

182153
public FileStatus[] listStatus(String filePath) throws IOException {
183-
if (fileSystem == null) {
184-
initialize();
185-
}
186-
return fileSystem.listStatus(new Path(filePath));
154+
return getFileSystem().listStatus(new Path(filePath));
187155
}
188156

189157
public FileStatus getFileStatus(String filePath) throws IOException {
190-
if (fileSystem == null) {
191-
initialize();
192-
}
193-
return fileSystem.getFileStatus(new Path(filePath));
158+
return getFileSystem().getFileStatus(new Path(filePath));
194159
}
195160

196161
public FSDataOutputStream getOutputStream(String filePath) throws IOException {
197-
if (fileSystem == null) {
198-
initialize();
199-
}
200-
return fileSystem.create(new Path(filePath), true);
162+
return getFileSystem().create(new Path(filePath), true);
201163
}
202164

203165
public FSDataInputStream getInputStream(String filePath) throws IOException {
166+
return getFileSystem().open(new Path(filePath));
167+
}
168+
169+
public FileSystem getFileSystem() {
204170
if (fileSystem == null) {
205171
initialize();
206172
}
207-
return fileSystem.open(new Path(filePath));
173+
return fileSystem;
208174
}
209175

210176
@SneakyThrows
@@ -258,16 +224,7 @@ private void initialize() {
258224
}
259225

260226
private Configuration createConfiguration() {
261-
Configuration configuration = new Configuration();
262-
configuration.setBoolean(READ_INT96_AS_FIXED, true);
263-
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
264-
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
265-
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
266-
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
267-
configuration.setBoolean(
268-
String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true);
269-
configuration.set(
270-
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
227+
Configuration configuration = hadoopConf.toConfiguration();
271228
hadoopConf.setExtraOptionsForConfiguration(configuration);
272229
return configuration;
273230
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.commons.collections4.CollectionUtils;
3737
import org.apache.commons.lang3.StringUtils;
3838
import org.apache.hadoop.conf.Configuration;
39-
import org.apache.hadoop.fs.CommonConfigurationKeys;
4039

4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
@@ -58,11 +57,6 @@
5857
import java.util.regex.Matcher;
5958
import java.util.stream.Collectors;
6059

61-
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
62-
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
63-
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
64-
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
65-
6660
public abstract class AbstractWriteStrategy implements WriteStrategy {
6761
protected final Logger log = LoggerFactory.getLogger(this.getClass());
6862
protected final FileSinkConfig fileSinkConfig;
@@ -148,14 +142,7 @@ protected SeaTunnelRowType buildSchemaWithRowType(
148142
*/
149143
@Override
150144
public Configuration getConfiguration(HadoopConf hadoopConf) {
151-
Configuration configuration = new Configuration();
152-
configuration.setBoolean(READ_INT96_AS_FIXED, true);
153-
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
154-
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
155-
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
156-
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
157-
configuration.set(
158-
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
145+
Configuration configuration = hadoopConf.toConfiguration();
159146
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
160147
return configuration;
161148
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
122122
.compress(compressFormat.getOrcCompression())
123123
// use orc version 0.12
124124
.version(OrcFile.Version.V_0_12)
125+
.fileSystem(hadoopFileSystemProxy.getFileSystem())
125126
.overwrite(true);
126127
Writer newWriter = OrcFile.createWriter(path, options);
127128
this.beingWrittenWriter.put(filePath, newWriter);

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java

+27-19
Original file line numberDiff line numberDiff line change
@@ -139,25 +139,33 @@ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath)
139139
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
140140
if (writer == null) {
141141
Path path = new Path(filePath);
142-
try {
143-
HadoopOutputFile outputFile =
144-
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
145-
ParquetWriter<GenericRecord> newWriter =
146-
AvroParquetWriter.<GenericRecord>builder(outputFile)
147-
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
148-
.withDataModel(dataModel)
149-
// use parquet v1 to improve compatibility
150-
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
151-
.withCompressionCodec(compressFormat.getParquetCompression())
152-
.withSchema(schema)
153-
.build();
154-
this.beingWrittenWriter.put(filePath, newWriter);
155-
return newWriter;
156-
} catch (IOException e) {
157-
String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
158-
throw new FileConnectorException(
159-
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
160-
}
142+
// initialize the kerberos login
143+
return hadoopFileSystemProxy.doWithHadoopAuth(
144+
(configuration, userGroupInformation) -> {
145+
try {
146+
HadoopOutputFile outputFile =
147+
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
148+
ParquetWriter<GenericRecord> newWriter =
149+
AvroParquetWriter.<GenericRecord>builder(outputFile)
150+
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
151+
.withDataModel(dataModel)
152+
// use parquet v1 to improve compatibility
153+
.withWriterVersion(
154+
ParquetProperties.WriterVersion.PARQUET_1_0)
155+
.withCompressionCodec(
156+
compressFormat.getParquetCompression())
157+
.withSchema(schema)
158+
.build();
159+
this.beingWrittenWriter.put(filePath, newWriter);
160+
return newWriter;
161+
} catch (IOException e) {
162+
String errorMsg =
163+
String.format(
164+
"Get parquet writer for file [%s] error", filePath);
165+
throw new FileConnectorException(
166+
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
167+
}
168+
});
161169
}
162170
return writer;
163171
}

0 commit comments

Comments
 (0)