Skip to content

Conversation

fcfangcc
Copy link
Contributor

@fcfangcc fcfangcc commented Oct 9, 2025

  1. sink.compaction.commit.parallelism not expose to user.
  2. compaction partitionCustom Logic error,( bucket % numPartitions ) maybe negative
java.lang.ArrayIndexOutOfBoundsException: Array index out of range: -1
at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:290)
at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:156)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.emit(CommitterOperator.java:199)

In java -5 % 4 = -5 - (4 * (-1)) = -5 + 4 = -1

Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @fcfangcc's nice work! Just left some comments.

DataStream<CommittableMessage<WriteResultWrapper>> keyedStream =
committableMessageDataStream.partitionCustom(
(bucket, numPartitions) -> bucket % numPartitions,
this::partitionCustom,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just write Math::floorMod here?

options.add(IcebergDataSinkOptions.PARTITION_KEY);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we're lacking a test case to cover all supported options in IcebergDataSinkFactoryTest. Would you like to add one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants