Skip to content

Commit beebcb5

Browse files
committed
[flink] Postpone mode should support 'partition.sink-strategy'
1 parent b0bb00f commit beebcb5

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.paimon.table.BucketMode;
3131
import org.apache.paimon.table.FileStoreTable;
3232
import org.apache.paimon.table.Table;
33+
import org.apache.paimon.table.sink.ChannelComputer;
3334

3435
import org.apache.flink.api.common.functions.MapFunction;
3536
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -291,8 +292,14 @@ protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
291292
}
292293

293294
private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> input) {
294-
DataStream<InternalRow> partitioned =
295-
partition(input, new PostponeBucketChannelComputer(table.schema()), parallelism);
295+
ChannelComputer<InternalRow> channelComputer;
296+
if (!table.partitionKeys().isEmpty()
297+
&& table.coreOptions().partitionSinkStrategy() == PartitionSinkStrategy.HASH) {
298+
channelComputer = new RowDataHashPartitionChannelComputer(table.schema());
299+
} else {
300+
channelComputer = new PostponeBucketChannelComputer(table.schema());
301+
}
302+
DataStream<InternalRow> partitioned = partition(input, channelComputer, parallelism);
296303
FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, null);
297304
return sink.sinkFrom(partitioned);
298305
}

0 commit comments

Comments
 (0)