Skip to content

Flink: If IcebergSink writeParallelism is not specified, defaults to the input source parallelism #13260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

rodmeneses
Copy link
Contributor

@rodmeneses rodmeneses commented Jun 6, 2025

Currently, if the writeParallelism is not specified, the IcebergSink will default to use the job parallelism.
Instead, we should default to the inputSource parallelism, to promote chaining.
This PR tracks that change, consequently bringing parity with the FlinkSink.

re: #12071 (comment)

cc: @stevenzwu @mxm @pvary @gyfora

@github-actions github-actions bot added the flink label Jun 6, 2025
@rodmeneses rodmeneses changed the title Flink: If IcebergSink writeParalellism is not specified, defaults to the input source paralellism Flink: If IcebergSink writeParallelism is not specified, defaults to the input source paralellism Jun 6, 2025
@rodmeneses rodmeneses force-pushed the sinkDefaultParallelism branch from 12447ac to e403b98 Compare June 6, 2025 18:23
@rodmeneses rodmeneses changed the title Flink: If IcebergSink writeParallelism is not specified, defaults to the input source paralellism Flink: If IcebergSink writeParallelism is not specified, defaults to the input source parallelism Jun 6, 2025
@rodmeneses rodmeneses force-pushed the sinkDefaultParallelism branch from ba49b0f to 08e3fab Compare June 11, 2025 16:09
@rodmeneses rodmeneses requested review from mxm and stevenzwu June 12, 2025 20:43

// since the sink write parallelism was null, it asserts that the default parallelism used was
// the input source parallelism
assertThat(sink.getTransformation().getParallelism()).isEqualTo(dataStream.getParallelism());
Copy link
Contributor

@stevenzwu stevenzwu Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sink has multi-stage DAG. does sink.getTransformation get the writer operator? or writer parallelism is always the same as the transformation parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did a debugging and could confirm it.

.tableLoader(tableLoader)
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.distributionMode(DistributionMode.NONE)
.writeParallelism(parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parallelism could be the same as the input stream parallelism. we need to set the parallelism to be differnt as the input stream parallelism

Copy link
Contributor Author

@rodmeneses rodmeneses Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the inputStream parallelism is always 1 and cannot easily be changed (it is a non-parallel source). So when the parallelism test template parameter is 2, the test is asserting that the writeParallelism is actually 2 (and not 1 as the parallelism of the inputSource)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants