Skip to content

Commit 8f81f7a

Browse files
committed
fix
1 parent f91adde commit 8f81f7a

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
package org.apache.paimon.flink.action;
2020

21+
import org.apache.paimon.catalog.CachingCatalog;
22+
import org.apache.paimon.catalog.Catalog;
2123
import org.apache.paimon.flink.clone.CloneHiveTableUtils;
2224
import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
25+
import org.apache.paimon.hive.HiveCatalog;
2326

2427
import javax.annotation.Nullable;
2528

@@ -55,6 +58,18 @@ public CloneAction(
5558
String cloneFrom) {
5659
super(sourceCatalogConfig);
5760

61+
if (cloneFrom.equalsIgnoreCase("hive")) {
62+
Catalog sourceCatalog = catalog;
63+
if (sourceCatalog instanceof CachingCatalog) {
64+
sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
65+
}
66+
if (!(sourceCatalog instanceof HiveCatalog)) {
67+
throw new UnsupportedOperationException(
68+
"Only support clone hive tables using HiveCatalog, but current source catalog is "
69+
+ sourceCatalog.getClass().getName());
70+
}
71+
}
72+
5873
this.sourceDatabase = sourceDatabase;
5974
this.sourceTableName = sourceTableName;
6075
this.sourceCatalogConfig = sourceCatalogConfig;

0 commit comments

Comments
 (0)