Skip to content

Commit b377469

Browse files
committed
[flink] Make clone hive tables production-ready
1 parent e5ac693 commit b377469

File tree

13 files changed

+825
-495
lines changed

13 files changed

+825
-495
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
---
2+
title: "Clone From Hive"
3+
weight: 5
4+
type: docs
5+
aliases:
6+
- /migration/clone-from-hive.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Hive Table Clone
28+
29+
Apache Hive supports Parquet, ORC file formats that could be cloned to Paimon. The cloned table will be
30+
[append table]({{< ref "append-table/overview" >}}). Supports file format of hive "orc" and "parquet" and "avro".
31+
32+
## Clone Hive Table
33+
34+
```bash
35+
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
36+
clone_hive \
37+
--database default
38+
--table hivetable
39+
--catalog_conf metastore=hive
40+
--catalog_conf uri=thrift://localhost:9088
41+
--target_database test
42+
--target_table test_table
43+
--target_catalog_conf warehouse=my_warehouse
44+
```
45+
46+
## Clone Hive Database
47+
48+
```bash
49+
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
50+
clone_hive \
51+
--database default
52+
--catalog_conf metastore=hive
53+
--catalog_conf uri=thrift://localhost:9088
54+
--target_database test
55+
--target_catalog_conf warehouse=my_warehouse
56+
```

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
package org.apache.paimon.flink.action;
2020

2121
import org.apache.paimon.catalog.Identifier;
22-
import org.apache.paimon.flink.clone.CloneHiveUtils;
22+
import org.apache.paimon.flink.clone.hive.CloneFileInfo;
23+
import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
24+
import org.apache.paimon.flink.clone.hive.CommitTableOperator;
25+
import org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
26+
import org.apache.paimon.flink.clone.hive.DataFileInfo;
27+
import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
2328
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
2429
import org.apache.paimon.options.CatalogOptions;
2530

31+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2632
import org.apache.flink.api.java.tuple.Tuple2;
2733
import org.apache.flink.streaming.api.datastream.DataStream;
2834
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
2935

3036
import javax.annotation.Nullable;
3137

32-
import java.util.List;
3338
import java.util.Map;
3439

3540
/** Clone source files managed by HiveMetaStore and commit metas to construct Paimon table. */
@@ -91,23 +96,33 @@ public void build() throws Exception {
9196
source, new CloneHiveUtils.TableChannelComputer(), parallelism);
9297

9398
// create target table, list files and group by <table, partition>
94-
DataStream<List<CloneHiveUtils.CloneFilesInfo>> files =
99+
DataStream<CloneFileInfo> files =
95100
partitionedSource
96101
.process(
97-
CloneHiveUtils.createTargetTableAndListFilesFunction(
102+
new ListHiveFilesFunction(
98103
sourceCatalogConfig, targetCatalogConfig, whereSql))
99104
.name("List Files")
100105
.setParallelism(parallelism);
101106

102107
// copy files and commit
103-
DataStream<Void> committed =
104-
files.forward()
108+
DataStream<DataFileInfo> dataFile =
109+
files.rebalance()
105110
.process(
106-
CloneHiveUtils.copyAndCommitFunction(
107-
sourceCatalogConfig, targetCatalogConfig))
108-
.name("Copy and Commit")
111+
new CopyHiveFilesFunction(sourceCatalogConfig, targetCatalogConfig))
112+
.name("Copy Files")
109113
.setParallelism(parallelism);
110114

115+
DataStream<DataFileInfo> partitionedDataFile =
116+
FlinkStreamPartitioner.partition(
117+
dataFile, new CloneHiveUtils.DataFileChannelComputer(), parallelism);
118+
119+
DataStream<Long> committed =
120+
partitionedDataFile
121+
.transform(
122+
"Commit table",
123+
BasicTypeInfo.LONG_TYPE_INFO,
124+
new CommitTableOperator(targetCatalogConfig))
125+
.setParallelism(parallelism);
111126
committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
112127
}
113128

0 commit comments

Comments
 (0)