Skip to content

Commit 3920369

Browse files
committed
fix
1 parent a41e5c4 commit 3920369

File tree

5 files changed

+35
-67
lines changed

5 files changed

+35
-67
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818

1919
package org.apache.paimon.flink.action;
2020

21-
import org.apache.paimon.catalog.CachingCatalog;
22-
import org.apache.paimon.catalog.Catalog;
2321
import org.apache.paimon.flink.clone.CloneHiveTableUtils;
2422
import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
25-
import org.apache.paimon.hive.HiveCatalog;
2623

2724
import javax.annotation.Nullable;
2825

@@ -43,7 +40,6 @@ public class CloneAction extends ActionBase {
4340
private final int parallelism;
4441
@Nullable private final String whereSql;
4542
@Nullable private final List<String> excludedTables;
46-
private final String cloneFrom;
4743

4844
public CloneAction(
4945
String sourceDatabase,
@@ -54,22 +50,9 @@ public CloneAction(
5450
Map<String, String> targetCatalogConfig,
5551
@Nullable Integer parallelism,
5652
@Nullable String whereSql,
57-
@Nullable List<String> excludedTables,
58-
String cloneFrom) {
53+
@Nullable List<String> excludedTables) {
5954
super(sourceCatalogConfig);
6055

61-
if (cloneFrom.equalsIgnoreCase("hive")) {
62-
Catalog sourceCatalog = catalog;
63-
if (sourceCatalog instanceof CachingCatalog) {
64-
sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
65-
}
66-
if (!(sourceCatalog instanceof HiveCatalog)) {
67-
throw new UnsupportedOperationException(
68-
"Only support clone hive tables using HiveCatalog, but current source catalog is "
69-
+ sourceCatalog.getClass().getName());
70-
}
71-
}
72-
7356
this.sourceDatabase = sourceDatabase;
7457
this.sourceTableName = sourceTableName;
7558
this.sourceCatalogConfig = sourceCatalogConfig;
@@ -81,40 +64,36 @@ public CloneAction(
8164
this.parallelism = parallelism == null ? env.getParallelism() : parallelism;
8265
this.whereSql = whereSql;
8366
this.excludedTables = excludedTables;
84-
this.cloneFrom = cloneFrom;
8567
}
8668

8769
@Override
8870
public void build() throws Exception {
89-
switch (cloneFrom) {
90-
case "hive":
91-
CloneHiveTableUtils.build(
92-
env,
93-
catalog,
94-
sourceDatabase,
95-
sourceTableName,
96-
sourceCatalogConfig,
97-
targetDatabase,
98-
targetTableName,
99-
targetCatalogConfig,
100-
parallelism,
101-
whereSql,
102-
excludedTables);
103-
break;
104-
case "paimon":
105-
ClonePaimonTableUtils.build(
106-
env,
107-
catalog,
108-
sourceDatabase,
109-
sourceTableName,
110-
sourceCatalogConfig,
111-
targetDatabase,
112-
targetTableName,
113-
targetCatalogConfig,
114-
parallelism,
115-
whereSql,
116-
excludedTables);
117-
break;
71+
if (CloneHiveTableUtils.isRootHiveCatalog(catalog)) {
72+
CloneHiveTableUtils.build(
73+
env,
74+
catalog,
75+
sourceDatabase,
76+
sourceTableName,
77+
sourceCatalogConfig,
78+
targetDatabase,
79+
targetTableName,
80+
targetCatalogConfig,
81+
parallelism,
82+
whereSql,
83+
excludedTables);
84+
} else {
85+
ClonePaimonTableUtils.build(
86+
env,
87+
catalog,
88+
sourceDatabase,
89+
sourceTableName,
90+
sourceCatalogConfig,
91+
targetDatabase,
92+
targetTableName,
93+
targetCatalogConfig,
94+
parallelism,
95+
whereSql,
96+
excludedTables);
11897
}
11998
}
12099

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public class CloneActionFactory implements ActionFactory {
3737
private static final String PARALLELISM = "parallelism";
3838
private static final String WHERE = "where";
3939
private static final String EXCLUDED_TABLES = "excluded_tables";
40-
private static final String CLONE_FROM = "clone_from";
4140

4241
@Override
4342
public String identifier() {
@@ -63,11 +62,6 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
6362
? null
6463
: Arrays.asList(StringUtils.split(excludedTablesStr, ","));
6564

66-
String cloneFrom = params.get(CLONE_FROM);
67-
if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
68-
cloneFrom = "hive";
69-
}
70-
7165
CloneAction cloneAction =
7266
new CloneAction(
7367
params.get(DATABASE),
@@ -78,8 +72,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
7872
targetCatalogConfig,
7973
parallelism == null ? null : Integer.parseInt(parallelism),
8074
params.get(WHERE),
81-
excludedTables,
82-
cloneFrom);
75+
excludedTables);
8376

8477
return Optional.of(cloneAction);
8578
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
122122
return env.fromCollection(result).forceNonParallel();
123123
}
124124

125+
public static boolean isRootHiveCatalog(Catalog catalog) {
126+
return DelegateCatalog.rootCatalog(catalog) instanceof HiveCatalog;
127+
}
128+
125129
public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
126130
Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
127131
checkArgument(

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ public class CloneProcedure extends ProcedureBase {
6161
@ArgumentHint(
6262
name = "excluded_tables",
6363
type = @DataTypeHint("STRING"),
64-
isOptional = true),
65-
@ArgumentHint(
66-
name = "clone_from",
67-
type = @DataTypeHint("STRING"),
6864
isOptional = true)
6965
})
7066
public String[] call(
@@ -77,8 +73,7 @@ public String[] call(
7773
String targetCatalogConfigStr,
7874
Integer parallelism,
7975
String where,
80-
String excludedTablesStr,
81-
String cloneFrom)
76+
String excludedTablesStr)
8277
throws Exception {
8378
Map<String, String> sourceCatalogConfig =
8479
new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -101,8 +96,7 @@ public String[] call(
10196
targetCatalogConfig,
10297
parallelism,
10398
where,
104-
excludedTables,
105-
cloneFrom);
99+
excludedTables);
106100
return execute(procedureContext, action, "Clone Job");
107101
}
108102

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,7 @@ public void testCloneBucketedAppendFromPaimon() throws Exception {
143143
"--target_table",
144144
"target",
145145
"--target_catalog_conf",
146-
"warehouse=" + warehouse2,
147-
"--clone_from",
148-
"paimon")
146+
"warehouse=" + warehouse2)
149147
.run();
150148

151149
sql(tEnv, "CALL catalog2.sys.compact(`table` => 'default.target')");

0 commit comments

Comments
 (0)