Skip to content

Commit 8d8ced0

Browse files
committed
Resolve conflicts
1 parent c9cad62 commit 8d8ced0

File tree

18 files changed

+264
-320
lines changed

18 files changed

+264
-320
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java

+2-80
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22-
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
23-
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24-
import org.apache.seatunnel.common.constants.PluginType;
25-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
2622
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2723
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2824
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
2925

3026
import lombok.Getter;
3127

32-
import java.io.Serializable;
33-
import java.util.List;
34-
3528
@Getter
3629
public class LocalFileSourceConfig extends BaseFileSourceConfig {
3730

@@ -48,77 +41,6 @@ public String getPluginName() {
4841
}
4942

5043
public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
51-
validateConfig(readonlyConfig);
52-
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
53-
this.localFileHadoopConf = new LocalFileHadoopConf();
54-
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
55-
this.filePaths = parseFilePaths(readonlyConfig);
56-
this.catalogTable = parseCatalogTable(readonlyConfig);
57-
}
58-
59-
private void validateConfig(ReadonlyConfig readonlyConfig) {
60-
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
61-
throw new FileConnectorException(
62-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
63-
String.format(
64-
"PluginName: %s, PluginType: %s, Message: %s",
65-
FileSystemType.LOCAL.getFileSystemPluginName(),
66-
PluginType.SOURCE,
67-
LocalFileSourceOptions.FILE_PATH + " is required"));
68-
}
69-
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
70-
throw new FileConnectorException(
71-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
72-
String.format(
73-
"PluginName: %s, PluginType: %s, Message: %s",
74-
FileSystemType.LOCAL.getFileSystemPluginName(),
75-
PluginType.SOURCE,
76-
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
77-
}
78-
}
79-
80-
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
81-
String rootPath = null;
82-
try {
83-
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
84-
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
85-
} catch (Exception ex) {
86-
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
87-
throw new FileConnectorException(
88-
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
89-
}
90-
}
91-
92-
private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
93-
final CatalogTable catalogTable;
94-
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
95-
catalogTable =
96-
CatalogTableUtil.buildWithConfig(
97-
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
98-
} else {
99-
catalogTable = CatalogTableUtil.buildSimpleTextTable();
100-
}
101-
if (CollectionUtils.isEmpty(filePaths)) {
102-
return catalogTable;
103-
}
104-
switch (fileFormat) {
105-
case CSV:
106-
case TEXT:
107-
case JSON:
108-
case EXCEL:
109-
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
110-
return CatalogTableUtil.newCatalogTable(
111-
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
112-
case ORC:
113-
case PARQUET:
114-
return CatalogTableUtil.newCatalogTable(
115-
catalogTable,
116-
readStrategy.getSeaTunnelRowTypeInfo(
117-
localFileHadoopConf, filePaths.get(0)));
118-
default:
119-
throw new FileConnectorException(
120-
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
121-
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
122-
}
44+
super(readonlyConfig);
12345
}
12446
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2122
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
2223
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
23-
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
2424
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
2525
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
2626

27-
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
28-
import org.apache.hadoop.hive.metastore.api.Table;
2927
import org.apache.thrift.TException;
3028

3129
import lombok.extern.slf4j.Slf4j;
@@ -44,11 +42,11 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
4442
private final ReadonlyConfig readonlyConfig;
4543

4644
public HiveSinkAggregatedCommitter(
47-
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
48-
super(fileSystemUtils);
45+
ReadonlyConfig readonlyConfig, String dbName, String tableName, HadoopConf hadoopConf) {
46+
super(hadoopConf);
4947
this.readonlyConfig = readonlyConfig;
50-
this.dbName = table.getDbName();
51-
this.tableName = table.getTableName();
48+
this.dbName = dbName;
49+
this.tableName = tableName;
5250
this.abortDropPartitionMetadata =
5351
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
5452
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2323

24-
public class BaseHiveOptions extends BaseSourceConfig {
24+
public class BaseHiveOptions extends BaseSourceConfigOptions {
2525

2626
public static final Option<String> TABLE_NAME =
2727
Options.key("table_name")

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java

+1-33
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
/*
32
* Licensed to the Apache Software Foundation (ASF) under one or more
43
* contributor license agreements. See the NOTICE file distributed with
@@ -18,14 +17,8 @@
1817

1918
package org.apache.seatunnel.connectors.seatunnel.hive.config;
2019

21-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
22-
2320
import org.apache.seatunnel.api.configuration.Option;
2421
import org.apache.seatunnel.api.configuration.Options;
25-
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
26-
27-
import org.apache.commons.lang3.tuple.Pair;
28-
import org.apache.hadoop.hive.metastore.api.Table;
2922

3023
import java.util.HashMap;
3124
import java.util.Map;
@@ -67,29 +60,4 @@ public class HiveConfig {
6760
.noDefaultValue()
6861
.withDescription(
6962
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files");
70-
71-
public static final String TEXT_INPUT_FORMAT_CLASSNAME =
72-
"org.apache.hadoop.mapred.TextInputFormat";
73-
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
74-
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
75-
public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
76-
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
77-
public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
78-
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
79-
public static final String ORC_INPUT_FORMAT_CLASSNAME =
80-
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
81-
public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
82-
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
83-
84-
public static Pair<String[], Table> getTableInfo(Config config) {
85-
String table = config.getString(TABLE_NAME.key());
86-
String[] splits = table.split("\\.");
87-
if (splits.length != 2) {
88-
throw new RuntimeException("Please config " + TABLE_NAME + " as db.table format");
89-
}
90-
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
91-
Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
92-
hiveMetaStoreProxy.close();
93-
return Pair.of(splits, tableInformation);
94-
}
95-
}
63+
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveHadoopConfig.java

-40
This file was deleted.

0 commit comments

Comments
 (0)