Skip to content

Commit bacf47f

Browse files
committed
Resolve conflicts
1 parent 9c1c46e commit bacf47f

File tree

15 files changed

+188
-165
lines changed

15 files changed

+188
-165
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java

+2-80
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22-
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
23-
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24-
import org.apache.seatunnel.common.constants.PluginType;
25-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
2622
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2723
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2824
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
2925

3026
import lombok.Getter;
3127

32-
import java.io.Serializable;
33-
import java.util.List;
34-
3528
@Getter
3629
public class LocalFileSourceConfig extends BaseFileSourceConfig {
3730

@@ -48,77 +41,6 @@ public String getPluginName() {
4841
}
4942

5043
public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
51-
validateConfig(readonlyConfig);
52-
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
53-
this.localFileHadoopConf = new LocalFileHadoopConf();
54-
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
55-
this.filePaths = parseFilePaths(readonlyConfig);
56-
this.catalogTable = parseCatalogTable(readonlyConfig);
57-
}
58-
59-
private void validateConfig(ReadonlyConfig readonlyConfig) {
60-
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
61-
throw new FileConnectorException(
62-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
63-
String.format(
64-
"PluginName: %s, PluginType: %s, Message: %s",
65-
FileSystemType.LOCAL.getFileSystemPluginName(),
66-
PluginType.SOURCE,
67-
LocalFileSourceOptions.FILE_PATH + " is required"));
68-
}
69-
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
70-
throw new FileConnectorException(
71-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
72-
String.format(
73-
"PluginName: %s, PluginType: %s, Message: %s",
74-
FileSystemType.LOCAL.getFileSystemPluginName(),
75-
PluginType.SOURCE,
76-
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
77-
}
78-
}
79-
80-
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
81-
String rootPath = null;
82-
try {
83-
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
84-
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
85-
} catch (Exception ex) {
86-
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
87-
throw new FileConnectorException(
88-
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
89-
}
90-
}
91-
92-
private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
93-
final CatalogTable catalogTable;
94-
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
95-
catalogTable =
96-
CatalogTableUtil.buildWithConfig(
97-
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
98-
} else {
99-
catalogTable = CatalogTableUtil.buildSimpleTextTable();
100-
}
101-
if (CollectionUtils.isEmpty(filePaths)) {
102-
return catalogTable;
103-
}
104-
switch (fileFormat) {
105-
case CSV:
106-
case TEXT:
107-
case JSON:
108-
case EXCEL:
109-
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
110-
return CatalogTableUtil.newCatalogTable(
111-
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
112-
case ORC:
113-
case PARQUET:
114-
return CatalogTableUtil.newCatalogTable(
115-
catalogTable,
116-
readStrategy.getSeaTunnelRowTypeInfo(
117-
localFileHadoopConf, filePaths.get(0)));
118-
default:
119-
throw new FileConnectorException(
120-
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
121-
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
122-
}
44+
super(readonlyConfig);
12345
}
12446
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
2222
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
23-
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
23+
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveHadoopConfig;
2424
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
2525
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
2626

27-
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
2827
import org.apache.hadoop.hive.metastore.api.Table;
2928
import org.apache.thrift.TException;
3029

@@ -44,8 +43,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
4443
private final ReadonlyConfig readonlyConfig;
4544

4645
public HiveSinkAggregatedCommitter(
47-
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
48-
super(fileSystemUtils);
46+
ReadonlyConfig readonlyConfig, Table table, HiveHadoopConfig hadoopConf) {
47+
super(hadoopConf);
4948
this.readonlyConfig = readonlyConfig;
5049
this.dbName = table.getDbName();
5150
this.tableName = table.getTableName();

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2323

24-
public class BaseHiveOptions extends BaseSourceConfig {
24+
public class BaseHiveOptions extends BaseSourceConfigOptions {
2525

2626
public static final Option<String> TABLE_NAME =
2727
Options.key("table_name")

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java

+23-13
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
3737
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
3838
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
39-
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
4039
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
4140
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
4241
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
@@ -71,31 +70,31 @@ public class HiveSink
7170
implements SeaTunnelSink<
7271
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
7372
SupportMultiTableSink {
74-
private final Table tableInformation;
73+
74+
// Since Table might contain some unserializable fields, we need to make it transient
75+
// And use getTableInformation to get the Table object
76+
private transient Table tableInformation;
7577
private final CatalogTable catalogTable;
7678
private final ReadonlyConfig readonlyConfig;
7779
private final HiveHadoopConfig hiveHadoopConfig;
78-
private final FileSystemUtils fileSystemUtils;
7980
private final FileSinkConfig fileSinkConfig;
8081
private final WriteStrategy writeStrategy;
8182
private String jobId;
8283

8384
public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
8485
this.readonlyConfig = readonlyConfig;
8586
this.catalogTable = catalogTable;
86-
this.tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
87-
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, tableInformation);
88-
this.fileSinkConfig =
89-
generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable);
87+
this.tableInformation = getTableInformation();
88+
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig);
89+
this.fileSinkConfig = generateFileSinkConfig(readonlyConfig, catalogTable);
9090
this.writeStrategy =
9191
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
92-
this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig);
9392
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
94-
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
9593
}
9694

9795
private FileSinkConfig generateFileSinkConfig(
98-
ReadonlyConfig readonlyConfig, Table tableInformation, CatalogTable catalogTable) {
96+
ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
97+
Table tableInformation = getTableInformation();
9998
Config pluginConfig = readonlyConfig.toConfig();
10099
List<String> sinkFields =
101100
tableInformation.getSd().getCols().stream()
@@ -171,7 +170,8 @@ public String getPluginName() {
171170
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
172171
createAggregatedCommitter() {
173172
return Optional.of(
174-
new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils));
173+
new HiveSinkAggregatedCommitter(
174+
readonlyConfig, getTableInformation(), hiveHadoopConfig));
175175
}
176176

177177
@Override
@@ -206,8 +206,8 @@ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
206206
return Optional.of(new DefaultSerializer<>());
207207
}
208208

209-
private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table table) {
210-
String hdfsLocation = tableInformation.getSd().getLocation();
209+
private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig) {
210+
String hdfsLocation = getTableInformation().getSd().getLocation();
211211
HiveHadoopConfig hiveHadoopConfig;
212212
try {
213213
URI uri = new URI(hdfsLocation);
@@ -235,6 +235,16 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
235235
readonlyConfig
236236
.getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
237237
.ifPresent(hiveHadoopConfig::setKerberosKeytabPath);
238+
readonlyConfig
239+
.getOptional(HiveSourceOptions.REMOTE_USER)
240+
.ifPresent(hiveHadoopConfig::setRemoteUser);
238241
return hiveHadoopConfig;
239242
}
243+
244+
private Table getTableInformation() {
245+
if (tableInformation == null) {
246+
tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
247+
}
248+
return tableInformation;
249+
}
240250
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2828
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
2929
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
30-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
3130
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
3231
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
3332
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
@@ -80,7 +79,7 @@ private ReadonlyConfig generateCurrentReadonlyConfig(
8079
tableName -> {
8180
String replacedPath =
8281
replaceCatalogTableInPath(tableName, catalogTable);
83-
configMap.put(BaseSinkConfig.FILE_PATH.key(), replacedPath);
82+
configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath);
8483
});
8584

8685
return ReadonlyConfig.fromMap(new HashMap<>(configMap));

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.seatunnel.api.source.SupportParallelism;
2727
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2828
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
29-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
29+
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
3030
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
3131
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
3232
import org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader;
@@ -50,7 +50,7 @@ public HiveSource(ReadonlyConfig readonlyConfig) {
5050

5151
@Override
5252
public String getPluginName() {
53-
return FileSystemType.LOCAL.getFileSystemPluginName();
53+
return HiveConstants.CONNECTOR_NAME;
5454
}
5555

5656
@Override

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
import org.apache.seatunnel.api.table.factory.Factory;
2525
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2626
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
27-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
28-
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
2928
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
3029
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
3130

@@ -49,16 +48,17 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
4948
@Override
5049
public OptionRule optionRule() {
5150
return OptionRule.builder()
52-
.required(HiveSourceOptions.TABLE_NAME)
53-
.required(HiveSourceOptions.METASTORE_URI)
51+
.optional(HiveSourceOptions.TABLE_NAME)
52+
.optional(HiveSourceOptions.METASTORE_URI)
5453
.optional(HiveSourceOptions.TABLE_CONFIGS)
5554
.optional(HiveSourceOptions.HIVE_SITE_PATH)
5655
.optional(
57-
HdfsSourceConfig.HDFS_SITE_PATH,
58-
HdfsSourceConfig.KERBEROS_PRINCIPAL,
59-
HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
60-
.optional(BaseSourceConfig.READ_PARTITIONS)
61-
.optional(BaseSourceConfig.READ_COLUMNS)
56+
HdfsSourceConfigOptions.HDFS_SITE_PATH,
57+
HdfsSourceConfigOptions.REMOTE_USER,
58+
HdfsSourceConfigOptions.KERBEROS_PRINCIPAL,
59+
HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
60+
.optional(HdfsSourceConfigOptions.READ_PARTITIONS)
61+
.optional(HdfsSourceConfigOptions.READ_COLUMNS)
6262
.build();
6363
}
6464

0 commit comments

Comments
 (0)