Skip to content

Commit 7d823b8

Browse files
committed
[flink] Rename clone_hive to clone and add procedure
1 parent 5a7e083 commit 7d823b8

File tree

15 files changed

+187
-77
lines changed

15 files changed

+187
-77
lines changed
Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
---
2-
title: "Clone From Hive"
2+
title: "Clone To Paimon"
33
weight: 5
44
type: docs
55
aliases:
6-
- /migration/clone-from-hive.html
6+
- /migration/clone.html
77
---
88
<!--
99
Licensed to the Apache Software Foundation (ASF) under one
@@ -24,27 +24,36 @@ specific language governing permissions and limitations
2424
under the License.
2525
-->
2626

27-
# Clone Hive Table
27+
# Clone To Paimon
2828

29-
Clone Hive Table supports cloning hive tables with parquet, orc and avro formats. The cloned table will be
30-
[append table]({{< ref "append-table/overview" >}}).
29+
Clone supports cloning tables to Paimon tables.
3130

3231
1. Clone is `OVERWRITE` semantic that will overwrite the partitions of the target table according to the data.
3332
2. Clone is reentrant, but it requires existing tables to contain all fields from the source table and have the
3433
same partition fields.
3534

35+
Currently, clone supports:
36+
37+
1. Clone Hive tables in Hive Catalog to Paimon Catalog, supports Parquet, ORC, Avro formats, target table will
38+
be append table.
39+
40+
The source table below is currently under development:
41+
1. Clone Hudi tables in Hive Catalog to Paimon Catalog, target table will be append table.
42+
2. Clone Paimon tables to Paimon tables, target table can be primary table or append table.
43+
3644
## Clone Hive Table
3745

3846
```bash
3947
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
40-
clone_hive \
48+
clone \
4149
--database default \
4250
--table hivetable \
4351
--catalog_conf metastore=hive \
4452
--catalog_conf uri=thrift://localhost:9088 \
4553
--target_database test \
4654
--target_table test_table \
4755
--target_catalog_conf warehouse=my_warehouse \
56+
--parallelism 10 \
4857
--where <filter_spec>
4958
```
5059

@@ -54,10 +63,11 @@ You can use filter spec to specify the filtering condition for the partition.
5463

5564
```bash
5665
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
57-
clone_hive \
66+
clone \
5867
--database default \
5968
--catalog_conf metastore=hive \
6069
--catalog_conf uri=thrift://localhost:9088 \
6170
--target_database test \
71+
--parallelism 10 \
6272
--target_catalog_conf warehouse=my_warehouse
6373
```

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import org.apache.paimon.catalog.CachingCatalog;
2222
import org.apache.paimon.catalog.Catalog;
2323
import org.apache.paimon.catalog.Identifier;
24-
import org.apache.paimon.flink.clone.hive.CloneFileInfo;
25-
import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
26-
import org.apache.paimon.flink.clone.hive.CommitTableOperator;
27-
import org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
28-
import org.apache.paimon.flink.clone.hive.DataFileInfo;
29-
import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
24+
import org.apache.paimon.flink.clone.CloneFileInfo;
25+
import org.apache.paimon.flink.clone.CloneFilesFunction;
26+
import org.apache.paimon.flink.clone.CloneUtils;
27+
import org.apache.paimon.flink.clone.CommitTableOperator;
28+
import org.apache.paimon.flink.clone.DataFileInfo;
29+
import org.apache.paimon.flink.clone.ListCloneFilesFunction;
3030
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
3131
import org.apache.paimon.hive.HiveCatalog;
3232

@@ -39,8 +39,8 @@
3939

4040
import java.util.Map;
4141

42-
/** Clone source files managed by HiveMetaStore and commit metas to construct Paimon table. */
43-
public class CloneHiveAction extends ActionBase {
42+
/** Clone source table to target table. */
43+
public class CloneAction extends ActionBase {
4444

4545
private final Map<String, String> sourceCatalogConfig;
4646
private final String sourceDatabase;
@@ -53,7 +53,7 @@ public class CloneHiveAction extends ActionBase {
5353
private final int parallelism;
5454
@Nullable private final String whereSql;
5555

56-
public CloneHiveAction(
56+
public CloneAction(
5757
String sourceDatabase,
5858
String sourceTableName,
5959
Map<String, String> sourceCatalogConfig,
@@ -90,7 +90,7 @@ public CloneHiveAction(
9090
public void build() throws Exception {
9191
// list source tables
9292
DataStream<Tuple2<Identifier, Identifier>> source =
93-
CloneHiveUtils.buildSource(
93+
CloneUtils.buildSource(
9494
sourceDatabase,
9595
sourceTableName,
9696
targetDatabase,
@@ -100,28 +100,27 @@ public void build() throws Exception {
100100

101101
DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
102102
FlinkStreamPartitioner.partition(
103-
source, new CloneHiveUtils.TableChannelComputer(), parallelism);
103+
source, new CloneUtils.TableChannelComputer(), parallelism);
104104

105105
// create target table, list files and group by <table, partition>
106106
DataStream<CloneFileInfo> files =
107107
partitionedSource
108108
.process(
109-
new ListHiveFilesFunction(
109+
new ListCloneFilesFunction(
110110
sourceCatalogConfig, targetCatalogConfig, whereSql))
111111
.name("List Files")
112112
.setParallelism(parallelism);
113113

114114
// copy files and commit
115115
DataStream<DataFileInfo> dataFile =
116116
files.rebalance()
117-
.process(
118-
new CopyHiveFilesFunction(sourceCatalogConfig, targetCatalogConfig))
117+
.process(new CloneFilesFunction(sourceCatalogConfig, targetCatalogConfig))
119118
.name("Copy Files")
120119
.setParallelism(parallelism);
121120

122121
DataStream<DataFileInfo> partitionedDataFile =
123122
FlinkStreamPartitioner.partition(
124-
dataFile, new CloneHiveUtils.DataFileChannelComputer(), parallelism);
123+
dataFile, new CloneUtils.DataFileChannelComputer(), parallelism);
125124

126125
DataStream<Long> committed =
127126
partitionedDataFile

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveActionFactory.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import java.util.Map;
2323
import java.util.Optional;
2424

25-
/** Factory to create {@link CloneHiveAction}. */
26-
public class CloneHiveActionFactory implements ActionFactory {
25+
/** Factory to create {@link CloneAction}. */
26+
public class CloneActionFactory implements ActionFactory {
2727

28-
private static final String IDENTIFIER = "clone_hive";
28+
private static final String IDENTIFIER = "clone";
2929
private static final String TARGET_WAREHOUSE = "target_warehouse";
3030
private static final String TARGET_DATABASE = "target_database";
3131
private static final String TARGET_TABLE = "target_table";
@@ -51,8 +51,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
5151

5252
String parallelism = params.get(PARALLELISM);
5353

54-
CloneHiveAction cloneHiveAction =
55-
new CloneHiveAction(
54+
CloneAction cloneAction =
55+
new CloneAction(
5656
params.get(DATABASE),
5757
params.get(TABLE),
5858
catalogConfig,
@@ -62,13 +62,13 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
6262
parallelism == null ? null : Integer.parseInt(parallelism),
6363
params.get(WHERE));
6464

65-
return Optional.of(cloneHiveAction);
65+
return Optional.of(cloneAction);
6666
}
6767

6868
@Override
6969
public void printHelp() {
7070
System.out.println(
71-
"Action \"clone_hive\" clones the source files and migrate them to paimon table.");
71+
"Action \"clone\" clones the source files and migrate them to paimon table.");
7272
System.out.println();
7373
}
7474
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;
@@ -36,14 +36,14 @@
3636
import java.util.HashMap;
3737
import java.util.Map;
3838

39-
/** Copy files for table. */
40-
public class CopyHiveFilesFunction extends CopyProcessFunction<CloneFileInfo, DataFileInfo> {
39+
/** Clone files for table. */
40+
public class CloneFilesFunction extends CloneProcessFunction<CloneFileInfo, DataFileInfo> {
4141

4242
private static final long serialVersionUID = 1L;
4343

4444
private transient Map<Identifier, Map<BinaryRow, DataFilePathFactory>> pathFactoryMap;
4545

46-
public CopyHiveFilesFunction(
46+
public CloneFilesFunction(
4747
Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
4848
super(sourceCatalogConfig, targetCatalogConfig);
4949
}
@@ -92,7 +92,16 @@ public void processElement(
9292
}
9393

9494
private DataFilePathFactory pathFactory(Identifier identifier, BinaryRow part) {
95-
FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
96-
return targetTable.store().pathFactory().createDataFilePathFactory(part, 0);
95+
return pathFactoryMap
96+
.computeIfAbsent(identifier, k -> new HashMap<>())
97+
.computeIfAbsent(
98+
part,
99+
k -> {
100+
FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
101+
return targetTable
102+
.store()
103+
.pathFactory()
104+
.createDataFilePathFactory(part, 0);
105+
});
97106
}
98107
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;
@@ -35,12 +35,12 @@
3535
import java.util.Map;
3636

3737
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
38-
import static org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatalog;
38+
import static org.apache.paimon.flink.clone.CloneUtils.getRootHiveCatalog;
3939

4040
/** Abstract function for copying tables. */
41-
public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
41+
public abstract class CloneProcessFunction<I, O> extends ProcessFunction<I, O> {
4242

43-
protected static final Logger LOG = LoggerFactory.getLogger(CopyProcessFunction.class);
43+
protected static final Logger LOG = LoggerFactory.getLogger(CloneProcessFunction.class);
4444

4545
protected final Map<String, String> sourceCatalogConfig;
4646
protected final Map<String, String> targetCatalogConfig;
@@ -51,7 +51,7 @@ public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
5151
protected transient Map<Identifier, Table> tableCache;
5252
protected transient DataFileMetaSerializer dataFileSerializer;
5353

54-
public CopyProcessFunction(
54+
public CloneProcessFunction(
5555
Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
5656
this.sourceCatalogConfig = sourceCatalogConfig;
5757
this.targetCatalogConfig = targetCatalogConfig;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.DelegateCatalog;
2323
import org.apache.paimon.catalog.Identifier;
24-
import org.apache.paimon.flink.action.CloneHiveAction;
24+
import org.apache.paimon.flink.action.CloneAction;
2525
import org.apache.paimon.hive.HiveCatalog;
2626
import org.apache.paimon.hive.migrate.HiveCloneUtils;
2727
import org.apache.paimon.table.sink.ChannelComputer;
@@ -40,10 +40,10 @@
4040
import static org.apache.paimon.utils.Preconditions.checkArgument;
4141
import static org.apache.paimon.utils.Preconditions.checkState;
4242

43-
/** Utils for building {@link CloneHiveAction}. */
44-
public class CloneHiveUtils {
43+
/** Utils for building {@link CloneAction}. */
44+
public class CloneUtils {
4545

46-
private static final Logger LOG = LoggerFactory.getLogger(CloneHiveUtils.class);
46+
private static final Logger LOG = LoggerFactory.getLogger(CloneUtils.class);
4747

4848
public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
4949
String sourceDatabase,

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone.hive;
19+
package org.apache.paimon.flink.clone;
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.annotation.VisibleForTesting;
@@ -48,14 +48,14 @@
4848
import static org.apache.paimon.utils.Preconditions.checkState;
4949

5050
/** List files for table. */
51-
public class ListHiveFilesFunction
52-
extends CopyProcessFunction<Tuple2<Identifier, Identifier>, CloneFileInfo> {
51+
public class ListCloneFilesFunction
52+
extends CloneProcessFunction<Tuple2<Identifier, Identifier>, CloneFileInfo> {
5353

5454
private static final long serialVersionUID = 1L;
5555

5656
@Nullable private final String whereSql;
5757

58-
public ListHiveFilesFunction(
58+
public ListCloneFilesFunction(
5959
Map<String, String> sourceCatalogConfig,
6060
Map<String, String> targetCatalogConfig,
6161
@Nullable String whereSql) {

0 commit comments

Comments
 (0)