Skip to content

Commit f91adde

Browse files
committed
[flink] Add clone from Paimon tables in clone
1 parent 07a6f23 commit f91adde

15 files changed

+1001
-221
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,8 @@
1818

1919
package org.apache.paimon.flink.action;
2020

21-
import org.apache.paimon.catalog.CachingCatalog;
22-
import org.apache.paimon.catalog.Catalog;
23-
import org.apache.paimon.catalog.Identifier;
24-
import org.apache.paimon.flink.clone.CloneFileInfo;
25-
import org.apache.paimon.flink.clone.CloneFilesFunction;
26-
import org.apache.paimon.flink.clone.CloneUtils;
27-
import org.apache.paimon.flink.clone.CommitTableOperator;
28-
import org.apache.paimon.flink.clone.DataFileInfo;
29-
import org.apache.paimon.flink.clone.ListCloneFilesFunction;
30-
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
31-
import org.apache.paimon.hive.HiveCatalog;
32-
33-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
34-
import org.apache.flink.api.java.tuple.Tuple2;
35-
import org.apache.flink.streaming.api.datastream.DataStream;
36-
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
21+
import org.apache.paimon.flink.clone.CloneHiveTableUtils;
22+
import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
3723

3824
import javax.annotation.Nullable;
3925

@@ -54,6 +40,7 @@ public class CloneAction extends ActionBase {
5440
private final int parallelism;
5541
@Nullable private final String whereSql;
5642
@Nullable private final List<String> excludedTables;
43+
private final String cloneFrom;
5744

5845
public CloneAction(
5946
String sourceDatabase,
@@ -64,19 +51,10 @@ public CloneAction(
6451
Map<String, String> targetCatalogConfig,
6552
@Nullable Integer parallelism,
6653
@Nullable String whereSql,
67-
@Nullable List<String> excludedTables) {
54+
@Nullable List<String> excludedTables,
55+
String cloneFrom) {
6856
super(sourceCatalogConfig);
6957

70-
Catalog sourceCatalog = catalog;
71-
if (sourceCatalog instanceof CachingCatalog) {
72-
sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
73-
}
74-
if (!(sourceCatalog instanceof HiveCatalog)) {
75-
throw new UnsupportedOperationException(
76-
"Only support clone hive tables using HiveCatalog, but current source catalog is "
77-
+ sourceCatalog.getClass().getName());
78-
}
79-
8058
this.sourceDatabase = sourceDatabase;
8159
this.sourceTableName = sourceTableName;
8260
this.sourceCatalogConfig = sourceCatalogConfig;
@@ -88,58 +66,46 @@ public CloneAction(
8866
this.parallelism = parallelism == null ? env.getParallelism() : parallelism;
8967
this.whereSql = whereSql;
9068
this.excludedTables = excludedTables;
69+
this.cloneFrom = cloneFrom;
9170
}
9271

9372
@Override
9473
public void build() throws Exception {
95-
// list source tables
96-
DataStream<Tuple2<Identifier, Identifier>> source =
97-
CloneUtils.buildSource(
74+
switch (cloneFrom) {
75+
case "hive":
76+
CloneHiveTableUtils.build(
77+
env,
78+
catalog,
9879
sourceDatabase,
9980
sourceTableName,
81+
sourceCatalogConfig,
10082
targetDatabase,
10183
targetTableName,
84+
targetCatalogConfig,
85+
parallelism,
86+
whereSql,
87+
excludedTables);
88+
break;
89+
case "paimon":
90+
ClonePaimonTableUtils.build(
91+
env,
10292
catalog,
103-
excludedTables,
104-
env);
105-
106-
DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
107-
FlinkStreamPartitioner.partition(
108-
source, new CloneUtils.TableChannelComputer(), parallelism);
109-
110-
// create target table, list files and group by <table, partition>
111-
DataStream<CloneFileInfo> files =
112-
partitionedSource
113-
.process(
114-
new ListCloneFilesFunction(
115-
sourceCatalogConfig, targetCatalogConfig, whereSql))
116-
.name("List Files")
117-
.setParallelism(parallelism);
118-
119-
// copy files and commit
120-
DataStream<DataFileInfo> dataFile =
121-
files.rebalance()
122-
.process(new CloneFilesFunction(sourceCatalogConfig, targetCatalogConfig))
123-
.name("Copy Files")
124-
.setParallelism(parallelism);
125-
126-
DataStream<DataFileInfo> partitionedDataFile =
127-
FlinkStreamPartitioner.partition(
128-
dataFile, new CloneUtils.DataFileChannelComputer(), parallelism);
129-
130-
DataStream<Long> committed =
131-
partitionedDataFile
132-
.transform(
133-
"Commit table",
134-
BasicTypeInfo.LONG_TYPE_INFO,
135-
new CommitTableOperator(targetCatalogConfig))
136-
.setParallelism(parallelism);
137-
committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
93+
sourceDatabase,
94+
sourceTableName,
95+
sourceCatalogConfig,
96+
targetDatabase,
97+
targetTableName,
98+
targetCatalogConfig,
99+
parallelism,
100+
whereSql,
101+
excludedTables);
102+
break;
103+
}
138104
}
139105

140106
@Override
141107
public void run() throws Exception {
142108
build();
143-
execute("Clone Hive job");
109+
execute("Clone job");
144110
}
145111
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class CloneActionFactory implements ActionFactory {
3737
private static final String PARALLELISM = "parallelism";
3838
private static final String WHERE = "where";
3939
private static final String EXCLUDED_TABLES = "excluded_tables";
40+
private static final String CLONE_FROM = "clone_from";
4041

4142
@Override
4243
public String identifier() {
@@ -62,6 +63,11 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
6263
? null
6364
: Arrays.asList(StringUtils.split(excludedTablesStr, ","));
6465

66+
String cloneFrom = params.get(CLONE_FROM);
67+
if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
68+
cloneFrom = "hive";
69+
}
70+
6571
CloneAction cloneAction =
6672
new CloneAction(
6773
params.get(DATABASE),
@@ -72,7 +78,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
7278
targetCatalogConfig,
7379
parallelism == null ? null : Integer.parseInt(parallelism),
7480
params.get(WHERE),
75-
excludedTables);
81+
excludedTables,
82+
cloneFrom);
7683

7784
return Optional.of(cloneAction);
7885
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.clone;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.DelegateCatalog;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.flink.action.CloneAction;
25+
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
26+
import org.apache.paimon.hive.HiveCatalog;
27+
import org.apache.paimon.utils.StringUtils;
28+
29+
import org.apache.commons.collections.CollectionUtils;
30+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
31+
import org.apache.flink.api.java.tuple.Tuple2;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import javax.annotation.Nullable;
39+
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
import java.util.Map;
43+
44+
import static org.apache.paimon.utils.Preconditions.checkArgument;
45+
import static org.apache.paimon.utils.Preconditions.checkState;
46+
47+
/** Utils for building {@link CloneAction} for append tables. */
48+
public class CloneHiveTableUtils {
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(CloneHiveTableUtils.class);
51+
52+
public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
53+
String sourceDatabase,
54+
String sourceTableName,
55+
String targetDatabase,
56+
String targetTableName,
57+
Catalog sourceCatalog,
58+
@Nullable List<String> excludedTables,
59+
StreamExecutionEnvironment env)
60+
throws Exception {
61+
List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
62+
HiveCatalog hiveCatalog = getRootHiveCatalog(sourceCatalog);
63+
if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
64+
checkArgument(
65+
StringUtils.isNullOrWhitespaceOnly(sourceTableName),
66+
"sourceTableName must be blank when database is null.");
67+
checkArgument(
68+
StringUtils.isNullOrWhitespaceOnly(targetDatabase),
69+
"targetDatabase must be blank when clone all tables in a catalog.");
70+
checkArgument(
71+
StringUtils.isNullOrWhitespaceOnly(targetTableName),
72+
"targetTableName must be blank when clone all tables in a catalog.");
73+
74+
for (Identifier identifier :
75+
org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
76+
hiveCatalog, excludedTables)) {
77+
result.add(new Tuple2<>(identifier, identifier));
78+
}
79+
} else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
80+
checkArgument(
81+
!StringUtils.isNullOrWhitespaceOnly(targetDatabase),
82+
"targetDatabase must not be blank when clone all tables in a database.");
83+
checkArgument(
84+
StringUtils.isNullOrWhitespaceOnly(targetTableName),
85+
"targetTableName must be blank when clone all tables in a catalog.");
86+
87+
for (Identifier identifier :
88+
org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
89+
hiveCatalog, sourceDatabase, excludedTables)) {
90+
result.add(
91+
new Tuple2<>(
92+
identifier,
93+
Identifier.create(targetDatabase, identifier.getObjectName())));
94+
}
95+
} else {
96+
checkArgument(
97+
!StringUtils.isNullOrWhitespaceOnly(targetDatabase),
98+
"targetDatabase must not be blank when clone a table.");
99+
checkArgument(
100+
!StringUtils.isNullOrWhitespaceOnly(targetTableName),
101+
"targetTableName must not be blank when clone a table.");
102+
checkArgument(
103+
CollectionUtils.isEmpty(excludedTables),
104+
"excludedTables must be empty when clone a single table.");
105+
result.add(
106+
new Tuple2<>(
107+
Identifier.create(sourceDatabase, sourceTableName),
108+
Identifier.create(targetDatabase, targetTableName)));
109+
}
110+
111+
checkState(!result.isEmpty(), "Didn't find any table in source catalog.");
112+
113+
if (LOG.isDebugEnabled()) {
114+
LOG.debug("The clone identifiers of source table and target table are: {}", result);
115+
}
116+
return env.fromCollection(result).forceNonParallel();
117+
}
118+
119+
public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
120+
Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
121+
checkArgument(
122+
rootCatalog instanceof HiveCatalog,
123+
"Only support HiveCatalog now but found %s.",
124+
rootCatalog.getClass().getName());
125+
return (HiveCatalog) rootCatalog;
126+
}
127+
128+
public static void build(
129+
StreamExecutionEnvironment env,
130+
Catalog sourceCatalog,
131+
String sourceDatabase,
132+
String sourceTableName,
133+
Map<String, String> sourceCatalogConfig,
134+
String targetDatabase,
135+
String targetTableName,
136+
Map<String, String> targetCatalogConfig,
137+
int parallelism,
138+
@Nullable String whereSql,
139+
@Nullable List<String> excludedTables)
140+
throws Exception {
141+
// list source tables
142+
DataStream<Tuple2<Identifier, Identifier>> source =
143+
buildSource(
144+
sourceDatabase,
145+
sourceTableName,
146+
targetDatabase,
147+
targetTableName,
148+
sourceCatalog,
149+
excludedTables,
150+
env);
151+
152+
DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
153+
FlinkStreamPartitioner.partition(
154+
source, new ShuffleIdentifierByTableComputer(), parallelism);
155+
156+
// create target table, list files and group by <table, partition>
157+
DataStream<CloneFileInfo> files =
158+
partitionedSource
159+
.process(
160+
new ListCloneFilesFunction(
161+
sourceCatalogConfig, targetCatalogConfig, whereSql))
162+
.name("List Files")
163+
.setParallelism(parallelism);
164+
165+
// copy files and commit
166+
DataStream<DataFileInfo> dataFile =
167+
files.rebalance()
168+
.process(new CloneFilesFunction(sourceCatalogConfig, targetCatalogConfig))
169+
.name("Copy Files")
170+
.setParallelism(parallelism);
171+
172+
DataStream<DataFileInfo> partitionedDataFile =
173+
FlinkStreamPartitioner.partition(
174+
dataFile, new ShuffleDataFileByTableComputer(), parallelism);
175+
176+
DataStream<Long> committed =
177+
partitionedDataFile
178+
.transform(
179+
"Commit table",
180+
BasicTypeInfo.LONG_TYPE_INFO,
181+
new CommitTableOperator(targetCatalogConfig))
182+
.setParallelism(parallelism);
183+
committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
184+
}
185+
}

0 commit comments

Comments
 (0)