Skip to content

Commit d8aebc5

Browse files
committed
fix
1 parent 5870f9d commit d8aebc5

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.table.sink.ChannelComputer;
2828
import org.apache.paimon.table.source.DataSplit;
2929
import org.apache.paimon.table.source.EndOfScanException;
30+
import org.apache.paimon.table.source.IncrementalSplit;
3031
import org.apache.paimon.table.source.SnapshotNotExistPlan;
3132
import org.apache.paimon.table.source.StreamTableScan;
3233
import org.apache.paimon.table.source.TableScan;
@@ -305,20 +306,39 @@ protected synchronized void assignSplits() {
305306
}
306307

307308
protected int assignSuggestedTask(FileStoreSourceSplit split) {
308-
DataSplit dataSplit = ((DataSplit) split.split());
309+
if (split.split() instanceof DataSplit) {
310+
return assignSuggestedTask((DataSplit) split.split());
311+
} else {
312+
return assignSuggestedTask((IncrementalSplit) split.split());
313+
}
314+
}
315+
316+
protected int assignSuggestedTask(DataSplit split) {
309317
int parallelism = context.currentParallelism();
310318

311319
int bucketId;
312-
if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) {
320+
if (split.bucket() == BucketMode.POSTPONE_BUCKET) {
313321
bucketId =
314-
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
322+
PostponeBucketFileStoreWrite.getWriteId(split.dataFiles().get(0).fileName())
315323
% parallelism;
316324
} else {
317-
bucketId = dataSplit.bucket();
325+
bucketId = split.bucket();
326+
}
327+
328+
if (shuffleBucketWithPartition) {
329+
return ChannelComputer.select(split.partition(), bucketId, parallelism);
330+
} else {
331+
return ChannelComputer.select(bucketId, parallelism);
318332
}
333+
}
334+
335+
protected int assignSuggestedTask(IncrementalSplit split) {
336+
int parallelism = context.currentParallelism();
319337

338+
// TODO how to deal with postpone bucket?
339+
int bucketId = split.bucket();
320340
if (shuffleBucketWithPartition) {
321-
return ChannelComputer.select(dataSplit.partition(), bucketId, parallelism);
341+
return ChannelComputer.select(split.partition(), bucketId, parallelism);
322342
} else {
323343
return ChannelComputer.select(bucketId, parallelism);
324344
}

0 commit comments

Comments
 (0)