Skip to content

Commit daf37ab

Browse files
committed
fix
1 parent 8f81f7a commit daf37ab

File tree

21 files changed

+129
-58
lines changed

21 files changed

+129
-58
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class CloneAction extends ActionBase {
4444
@Nullable private final String whereSql;
4545
@Nullable private final List<String> excludedTables;
4646
private final String cloneFrom;
47+
@Nullable private final Integer bucket;
4748

4849
public CloneAction(
4950
String sourceDatabase,
@@ -55,8 +56,10 @@ public CloneAction(
5556
@Nullable Integer parallelism,
5657
@Nullable String whereSql,
5758
@Nullable List<String> excludedTables,
58-
String cloneFrom) {
59+
String cloneFrom,
60+
@Nullable Integer bucket) {
5961
super(sourceCatalogConfig);
62+
this.bucket = bucket;
6063

6164
if (cloneFrom.equalsIgnoreCase("hive")) {
6265
Catalog sourceCatalog = catalog;
@@ -113,7 +116,8 @@ public void build() throws Exception {
113116
targetCatalogConfig,
114117
parallelism,
115118
whereSql,
116-
excludedTables);
119+
excludedTables,
120+
bucket);
117121
break;
118122
}
119123
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class CloneActionFactory implements ActionFactory {
3838
private static final String WHERE = "where";
3939
private static final String EXCLUDED_TABLES = "excluded_tables";
4040
private static final String CLONE_FROM = "clone_from";
41+
private static final String BUCKET = "bucket";
4142

4243
@Override
4344
public String identifier() {
@@ -68,6 +69,12 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
6869
cloneFrom = "hive";
6970
}
7071

72+
String bucketString = params.get(BUCKET);
73+
Integer bucket = null;
74+
if (bucketString != null) {
75+
bucket = Integer.valueOf(bucketString);
76+
}
77+
7178
CloneAction cloneAction =
7279
new CloneAction(
7380
params.get(DATABASE),
@@ -79,7 +86,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
7986
parallelism == null ? null : Integer.parseInt(parallelism),
8087
params.get(WHERE),
8188
excludedTables,
82-
cloneFrom);
89+
cloneFrom,
90+
bucket);
8391

8492
return Optional.of(cloneAction);
8593
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
import org.apache.paimon.catalog.DelegateCatalog;
2323
import org.apache.paimon.catalog.Identifier;
2424
import org.apache.paimon.flink.action.CloneAction;
25+
import org.apache.paimon.flink.clone.files.CloneFileInfo;
26+
import org.apache.paimon.flink.clone.files.CloneFilesFunction;
27+
import org.apache.paimon.flink.clone.files.CommitTableOperator;
28+
import org.apache.paimon.flink.clone.files.DataFileInfo;
29+
import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
30+
import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
2531
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
2632
import org.apache.paimon.hive.HiveCatalog;
2733
import org.apache.paimon.utils.StringUtils;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.flink.action.CloneAction;
24+
import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
25+
import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
26+
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
27+
import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
28+
import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
29+
import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
2430
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
2531
import org.apache.paimon.utils.StringUtils;
2632

@@ -123,7 +129,8 @@ public static void build(
123129
Map<String, String> targetCatalogConfig,
124130
int parallelism,
125131
@Nullable String whereSql,
126-
@Nullable List<String> excludedTables)
132+
@Nullable List<String> excludedTables,
133+
@Nullable Integer bucket)
127134
throws Exception {
128135
// list source tables
129136
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -144,7 +151,7 @@ public static void build(
144151
partitionedSource
145152
.process(
146153
new ListCloneSplitsFunction(
147-
sourceCatalogConfig, targetCatalogConfig, whereSql))
154+
sourceCatalogConfig, targetCatalogConfig, whereSql, bucket))
148155
.name("List Files")
149156
.setParallelism(parallelism);
150157

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone;
19+
package org.apache.paimon.flink.clone.files;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone;
19+
package org.apache.paimon.flink.clone.files;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneProcessFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone;
19+
package org.apache.paimon.flink.clone.files;
2020

2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CommitTableOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone;
19+
package org.apache.paimon.flink.clone.files;
2020

2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java renamed to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.clone;
19+
package org.apache.paimon.flink.clone.files;
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.data.BinaryRow;

0 commit comments

Comments
 (0)