Skip to content

Commit c9cad62

Browse files
committed
Hive support multiple table
1 parent 6d71069 commit c9cad62

File tree

28 files changed

+1618
-418
lines changed

28 files changed

+1618
-418
lines changed

Diff for: docs/en/connector-v2/sink/Hive.md

+51-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`
4747

4848
### table_name [string]
4949

50-
Target Hive table name eg: db1.table1
50+
Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use `${database_name}.${table_name}` to generate the table name, it will replace the `${database_name}` and `${table_name}` with the value of the CatalogTable generate from the source.
5151

5252
### metastore_uri [string]
5353

@@ -343,6 +343,56 @@ sink {
343343
}
344344
```
345345

346+
### example 2
347+
348+
We have multiple source table like this:
349+
350+
```bash
351+
create table test_1(
352+
)
353+
PARTITIONED BY (xx);
354+
355+
create table test_2(
356+
)
357+
PARTITIONED BY (xx);
358+
...
359+
```
360+
361+
We need read data from these source tables and write to another tables:
362+
363+
The job config file can like this:
364+
365+
```
366+
env {
367+
# You can set flink configuration here
368+
parallelism = 3
369+
job.name="test_hive_source_to_hive"
370+
}
371+
372+
source {
373+
Hive {
374+
tables_configs = [
375+
{
376+
table_name = "test_hive.test_1"
377+
metastore_uri = "thrift://ctyun6:9083"
378+
},
379+
{
380+
table_name = "test_hive.test_2"
381+
metastore_uri = "thrift://ctyun7:9083"
382+
}
383+
]
384+
}
385+
}
386+
387+
sink {
388+
# choose stdout output plugin to output data to console
389+
Hive {
390+
table_name = "${database_name}.${table_name}"
391+
metastore_uri = "thrift://ctyun7:9083"
392+
}
393+
}
394+
```
395+
346396
## Changelog
347397

348398
### 2.2.0-beta 2022-09-26

Diff for: docs/en/connector-v2/source/Hive.md

+21-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ Hive metastore uri
5959

6060
The path of `hdfs-site.xml`, used to load ha configuration of namenodes
6161

62-
### hive_site_path [string]
63-
64-
The path of `hive-site.xml`, used to authentication hive metastore
65-
6662
### read_partitions [list]
6763

6864
The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table.
@@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source Common Options](common-
10298

10399
## Example
104100

101+
### Example 1: Single table
102+
105103
```bash
106104

107105
Hive {
@@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source Common Options](common-
111109

112110
```
113111

112+
### Example 2: Multiple tables
113+
114+
```bash
115+
116+
Hive {
117+
tables_configs = [
118+
{
119+
table_name = "default.seatunnel_orc_1"
120+
metastore_uri = "thrift://namenode001:9083"
121+
},
122+
{
123+
table_name = "default.seatunnel_orc_2"
124+
metastore_uri = "thrift://namenode001:9083"
125+
}
126+
]
127+
}
128+
129+
```
130+
114131
## Changelog
115132

116133
### 2.2.0-beta 2022-09-26

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

+39
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Optional;
45+
import java.util.function.Function;
46+
import java.util.stream.Collectors;
4547

4648
/** Utils contains some common methods for construct CatalogTable. */
4749
@Slf4j
@@ -234,4 +236,41 @@ public static SeaTunnelRowType buildSimpleTextSchema() {
234236
public static CatalogTable buildSimpleTextTable() {
235237
return getCatalogTable("default", buildSimpleTextSchema());
236238
}
239+
240+
public static CatalogTable newCatalogTable(
241+
CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
242+
TableSchema tableSchema = catalogTable.getTableSchema();
243+
244+
Map<String, Column> columnMap =
245+
tableSchema.getColumns().stream()
246+
.collect(Collectors.toMap(Column::getName, Function.identity()));
247+
String[] fieldNames = seaTunnelRowType.getFieldNames();
248+
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
249+
250+
List<Column> finalColumns = new ArrayList<>();
251+
for (int i = 0; i < fieldNames.length; i++) {
252+
Column column = columnMap.get(fieldNames[i]);
253+
if (column != null) {
254+
finalColumns.add(column);
255+
} else {
256+
finalColumns.add(
257+
PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null));
258+
}
259+
}
260+
261+
TableSchema finalSchema =
262+
TableSchema.builder()
263+
.columns(finalColumns)
264+
.primaryKey(tableSchema.getPrimaryKey())
265+
.constraintKey(tableSchema.getConstraintKeys())
266+
.build();
267+
268+
return CatalogTable.of(
269+
catalogTable.getTableId(),
270+
finalSchema,
271+
catalogTable.getOptions(),
272+
catalogTable.getPartitionKeys(),
273+
catalogTable.getComment(),
274+
catalogTable.getCatalogName());
275+
}
237276
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java

+80-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
23+
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24+
import org.apache.seatunnel.common.constants.PluginType;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2226
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2327
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2428
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
2529

2630
import lombok.Getter;
2731

32+
import java.io.Serializable;
33+
import java.util.List;
34+
2835
@Getter
2936
public class LocalFileSourceConfig extends BaseFileSourceConfig {
3037

@@ -41,6 +48,77 @@ public String getPluginName() {
4148
}
4249

4350
public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
44-
super(readonlyConfig);
51+
validateConfig(readonlyConfig);
52+
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
53+
this.localFileHadoopConf = new LocalFileHadoopConf();
54+
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
55+
this.filePaths = parseFilePaths(readonlyConfig);
56+
this.catalogTable = parseCatalogTable(readonlyConfig);
57+
}
58+
59+
private void validateConfig(ReadonlyConfig readonlyConfig) {
60+
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
61+
throw new FileConnectorException(
62+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
63+
String.format(
64+
"PluginName: %s, PluginType: %s, Message: %s",
65+
FileSystemType.LOCAL.getFileSystemPluginName(),
66+
PluginType.SOURCE,
67+
LocalFileSourceOptions.FILE_PATH + " is required"));
68+
}
69+
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
70+
throw new FileConnectorException(
71+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
72+
String.format(
73+
"PluginName: %s, PluginType: %s, Message: %s",
74+
FileSystemType.LOCAL.getFileSystemPluginName(),
75+
PluginType.SOURCE,
76+
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
77+
}
78+
}
79+
80+
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
81+
String rootPath = null;
82+
try {
83+
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
84+
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
85+
} catch (Exception ex) {
86+
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
87+
throw new FileConnectorException(
88+
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
89+
}
90+
}
91+
92+
private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
93+
final CatalogTable catalogTable;
94+
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
95+
catalogTable =
96+
CatalogTableUtil.buildWithConfig(
97+
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
98+
} else {
99+
catalogTable = CatalogTableUtil.buildSimpleTextTable();
100+
}
101+
if (CollectionUtils.isEmpty(filePaths)) {
102+
return catalogTable;
103+
}
104+
switch (fileFormat) {
105+
case CSV:
106+
case TEXT:
107+
case JSON:
108+
case EXCEL:
109+
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
110+
return CatalogTableUtil.newCatalogTable(
111+
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
112+
case ORC:
113+
case PARQUET:
114+
return CatalogTableUtil.newCatalogTable(
115+
catalogTable,
116+
readStrategy.getSeaTunnelRowTypeInfo(
117+
localFileHadoopConf, filePaths.get(0)));
118+
default:
119+
throw new FileConnectorException(
120+
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
121+
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
122+
}
45123
}
46124
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java

+16-16
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
2422
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
23+
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
24+
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
2525
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
2626

27+
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
28+
import org.apache.hadoop.hive.metastore.api.Table;
2729
import org.apache.thrift.TException;
2830

2931
import lombok.extern.slf4j.Slf4j;
@@ -33,33 +35,31 @@
3335
import java.util.Map;
3436
import java.util.stream.Collectors;
3537

36-
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
37-
3838
@Slf4j
3939
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
40-
private final Config pluginConfig;
4140
private final String dbName;
4241
private final String tableName;
4342
private final boolean abortDropPartitionMetadata;
4443

44+
private final ReadonlyConfig readonlyConfig;
45+
4546
public HiveSinkAggregatedCommitter(
46-
Config pluginConfig, String dbName, String tableName, HadoopConf hadoopConf) {
47-
super(hadoopConf);
48-
this.pluginConfig = pluginConfig;
49-
this.dbName = dbName;
50-
this.tableName = tableName;
47+
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
48+
super(fileSystemUtils);
49+
this.readonlyConfig = readonlyConfig;
50+
this.dbName = table.getDbName();
51+
this.tableName = table.getTableName();
5152
this.abortDropPartitionMetadata =
52-
pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
53-
? pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
54-
: ABORT_DROP_PARTITION_METADATA.defaultValue();
53+
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
5554
}
5655

5756
@Override
5857
public List<FileAggregatedCommitInfo> commit(
5958
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
59+
6060
List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
6161
if (errorCommitInfos.isEmpty()) {
62-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
62+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
6363
try {
6464
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
6565
Map<String, List<String>> partitionDirAndValuesMap =
@@ -87,7 +87,7 @@ public List<FileAggregatedCommitInfo> commit(
8787
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
8888
super.abort(aggregatedCommitInfos);
8989
if (abortDropPartitionMetadata) {
90-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
90+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
9191
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
9292
Map<String, List<String>> partitionDirAndValuesMap =
9393
aggregatedCommitInfo.getPartitionDirAndValuesMap();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hive.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
23+
24+
public class BaseHiveOptions extends BaseSourceConfig {
25+
26+
public static final Option<String> TABLE_NAME =
27+
Options.key("table_name")
28+
.stringType()
29+
.noDefaultValue()
30+
.withDescription("Hive table name");
31+
32+
public static final Option<String> METASTORE_URI =
33+
Options.key("metastore_uri")
34+
.stringType()
35+
.noDefaultValue()
36+
.withDescription("Hive metastore uri");
37+
38+
public static final Option<String> HIVE_SITE_PATH =
39+
Options.key("hive_site_path")
40+
.stringType()
41+
.noDefaultValue()
42+
.withDescription("The path of hive-site.xml");
43+
}

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/*
23
* Licensed to the Apache Software Foundation (ASF) under one or more
34
* contributor license agreements. See the NOTICE file distributed with
@@ -91,4 +92,4 @@ public static Pair<String[], Table> getTableInfo(Config config) {
9192
hiveMetaStoreProxy.close();
9293
return Pair.of(splits, tableInformation);
9394
}
94-
}
95+
}

0 commit comments

Comments
 (0)