Skip to content

Commit 4d9287f

Browse files
authored
[Feature] Hive Source/Sink support multiple table (#5929)
1 parent 7b4e072 commit 4d9287f

File tree

30 files changed

+1612
-468
lines changed

30 files changed

+1612
-468
lines changed

docs/en/connector-v2/sink/Hive.md

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`
4747

4848
### table_name [string]
4949

50-
Target Hive table name eg: db1.table1
50+
Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use `${database_name}.${table_name}` to generate the table name, it will replace the `${database_name}` and `${table_name}` with the value of the CatalogTable generate from the source.
5151

5252
### metastore_uri [string]
5353

@@ -343,6 +343,56 @@ sink {
343343
}
344344
```
345345

346+
### example 2
347+
348+
We have multiple source table like this:
349+
350+
```bash
351+
create table test_1(
352+
)
353+
PARTITIONED BY (xx);
354+
355+
create table test_2(
356+
)
357+
PARTITIONED BY (xx);
358+
...
359+
```
360+
361+
We need read data from these source tables and write to another tables:
362+
363+
The job config file can like this:
364+
365+
```
366+
env {
367+
# You can set flink configuration here
368+
parallelism = 3
369+
job.name="test_hive_source_to_hive"
370+
}
371+
372+
source {
373+
Hive {
374+
tables_configs = [
375+
{
376+
table_name = "test_hive.test_1"
377+
metastore_uri = "thrift://ctyun6:9083"
378+
},
379+
{
380+
table_name = "test_hive.test_2"
381+
metastore_uri = "thrift://ctyun7:9083"
382+
}
383+
]
384+
}
385+
}
386+
387+
sink {
388+
# choose stdout output plugin to output data to console
389+
Hive {
390+
table_name = "${database_name}.${table_name}"
391+
metastore_uri = "thrift://ctyun7:9083"
392+
}
393+
}
394+
```
395+
346396
## Changelog
347397

348398
### 2.2.0-beta 2022-09-26

docs/en/connector-v2/source/Hive.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ Hive metastore uri
5959

6060
The path of `hdfs-site.xml`, used to load ha configuration of namenodes
6161

62-
### hive_site_path [string]
63-
64-
The path of `hive-site.xml`, used to authentication hive metastore
65-
6662
### read_partitions [list]
6763

6864
The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table.
@@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source Common Options](common-
10298

10399
## Example
104100

101+
### Example 1: Single table
102+
105103
```bash
106104

107105
Hive {
@@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source Common Options](common-
111109

112110
```
113111

112+
### Example 2: Multiple tables
113+
114+
```bash
115+
116+
Hive {
117+
tables_configs = [
118+
{
119+
table_name = "default.seatunnel_orc_1"
120+
metastore_uri = "thrift://namenode001:9083"
121+
},
122+
{
123+
table_name = "default.seatunnel_orc_2"
124+
metastore_uri = "thrift://namenode001:9083"
125+
}
126+
]
127+
}
128+
129+
```
130+
114131
## Changelog
115132

116133
### 2.2.0-beta 2022-09-26

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Optional;
45+
import java.util.function.Function;
46+
import java.util.stream.Collectors;
4547

4648
/** Utils contains some common methods for construct CatalogTable. */
4749
@Slf4j
@@ -234,4 +236,41 @@ public static SeaTunnelRowType buildSimpleTextSchema() {
234236
public static CatalogTable buildSimpleTextTable() {
235237
return getCatalogTable("default", buildSimpleTextSchema());
236238
}
239+
240+
public static CatalogTable newCatalogTable(
241+
CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
242+
TableSchema tableSchema = catalogTable.getTableSchema();
243+
244+
Map<String, Column> columnMap =
245+
tableSchema.getColumns().stream()
246+
.collect(Collectors.toMap(Column::getName, Function.identity()));
247+
String[] fieldNames = seaTunnelRowType.getFieldNames();
248+
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
249+
250+
List<Column> finalColumns = new ArrayList<>();
251+
for (int i = 0; i < fieldNames.length; i++) {
252+
Column column = columnMap.get(fieldNames[i]);
253+
if (column != null) {
254+
finalColumns.add(column);
255+
} else {
256+
finalColumns.add(
257+
PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null));
258+
}
259+
}
260+
261+
TableSchema finalSchema =
262+
TableSchema.builder()
263+
.columns(finalColumns)
264+
.primaryKey(tableSchema.getPrimaryKey())
265+
.constraintKey(tableSchema.getConstraintKeys())
266+
.build();
267+
268+
return CatalogTable.of(
269+
catalogTable.getTableId(),
270+
finalSchema,
271+
catalogTable.getOptions(),
272+
catalogTable.getPartitionKeys(),
273+
catalogTable.getComment(),
274+
catalogTable.getCatalogName());
275+
}
237276
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2322
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
2423
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
24+
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
2525
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
2626

2727
import org.apache.thrift.TException;
@@ -33,33 +33,31 @@
3333
import java.util.Map;
3434
import java.util.stream.Collectors;
3535

36-
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
37-
3836
@Slf4j
3937
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
40-
private final Config pluginConfig;
4138
private final String dbName;
4239
private final String tableName;
4340
private final boolean abortDropPartitionMetadata;
4441

42+
private final ReadonlyConfig readonlyConfig;
43+
4544
public HiveSinkAggregatedCommitter(
46-
Config pluginConfig, String dbName, String tableName, HadoopConf hadoopConf) {
45+
ReadonlyConfig readonlyConfig, String dbName, String tableName, HadoopConf hadoopConf) {
4746
super(hadoopConf);
48-
this.pluginConfig = pluginConfig;
47+
this.readonlyConfig = readonlyConfig;
4948
this.dbName = dbName;
5049
this.tableName = tableName;
5150
this.abortDropPartitionMetadata =
52-
pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
53-
? pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
54-
: ABORT_DROP_PARTITION_METADATA.defaultValue();
51+
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
5552
}
5653

5754
@Override
5855
public List<FileAggregatedCommitInfo> commit(
5956
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
57+
6058
List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
6159
if (errorCommitInfos.isEmpty()) {
62-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
60+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
6361
try {
6462
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
6563
Map<String, List<String>> partitionDirAndValuesMap =
@@ -87,7 +85,7 @@ public List<FileAggregatedCommitInfo> commit(
8785
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
8886
super.abort(aggregatedCommitInfos);
8987
if (abortDropPartitionMetadata) {
90-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
88+
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
9189
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
9290
Map<String, List<String>> partitionDirAndValuesMap =
9391
aggregatedCommitInfo.getPartitionDirAndValuesMap();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hive.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
23+
24+
public class BaseHiveOptions extends BaseSourceConfigOptions {
25+
26+
public static final Option<String> TABLE_NAME =
27+
Options.key("table_name")
28+
.stringType()
29+
.noDefaultValue()
30+
.withDescription("Hive table name");
31+
32+
public static final Option<String> METASTORE_URI =
33+
Options.key("metastore_uri")
34+
.stringType()
35+
.noDefaultValue()
36+
.withDescription("Hive metastore uri");
37+
38+
public static final Option<String> HIVE_SITE_PATH =
39+
Options.key("hive_site_path")
40+
.stringType()
41+
.noDefaultValue()
42+
.withDescription("The path of hive-site.xml");
43+
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.hive.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.Option;
2321
import org.apache.seatunnel.api.configuration.Options;
24-
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
25-
26-
import org.apache.commons.lang3.tuple.Pair;
27-
import org.apache.hadoop.hive.metastore.api.Table;
2822

2923
import java.util.HashMap;
3024
import java.util.Map;
@@ -66,29 +60,4 @@ public class HiveConfig {
6660
.noDefaultValue()
6761
.withDescription(
6862
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files");
69-
70-
public static final String TEXT_INPUT_FORMAT_CLASSNAME =
71-
"org.apache.hadoop.mapred.TextInputFormat";
72-
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
73-
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
74-
public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
75-
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
76-
public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
77-
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
78-
public static final String ORC_INPUT_FORMAT_CLASSNAME =
79-
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
80-
public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
81-
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
82-
83-
public static Pair<String[], Table> getTableInfo(Config config) {
84-
String table = config.getString(TABLE_NAME.key());
85-
String[] splits = table.split("\\.");
86-
if (splits.length != 2) {
87-
throw new RuntimeException("Please config " + TABLE_NAME + " as db.table format");
88-
}
89-
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
90-
Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
91-
hiveMetaStoreProxy.close();
92-
return Pair.of(splits, tableInformation);
93-
}
9463
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hive.config;
19+
20+
public class HiveConstants {
21+
22+
public static final String CONNECTOR_NAME = "Hive";
23+
24+
public static final String TEXT_INPUT_FORMAT_CLASSNAME =
25+
"org.apache.hadoop.mapred.TextInputFormat";
26+
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
27+
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
28+
public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
29+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
30+
public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
31+
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
32+
public static final String ORC_INPUT_FORMAT_CLASSNAME =
33+
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
34+
public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
35+
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
36+
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ public enum HiveConnectorErrorCode implements SeaTunnelErrorCode {
2323
GET_HDFS_NAMENODE_HOST_FAILED("HIVE-01", "Get name node host from table location failed"),
2424
INITIALIZE_HIVE_METASTORE_CLIENT_FAILED("HIVE-02", "Initialize hive metastore client failed"),
2525
GET_HIVE_TABLE_INFORMATION_FAILED(
26-
"HIVE-03", "Get hive table information from hive metastore service failed");
26+
"HIVE-03", "Get hive table information from hive metastore service failed"),
27+
HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
28+
;
2729

2830
private final String code;
2931
private final String description;

0 commit comments

Comments
 (0)