Skip to content

[FLINK-39696][runtime] Use source partition for distributed flush events#4400

Merged
yuxiqian merged 1 commit into
apache:masterfrom
taoran92:FLINK-39696
May 18, 2026
Merged

[FLINK-39696][runtime] Use source partition for distributed flush events#4400
yuxiqian merged 1 commit into
apache:masterfrom
taoran92:FLINK-39696

Conversation

@taoran92
Copy link
Copy Markdown
Member

@taoran92 taoran92 commented May 18, 2026

This closes FLINK-39696.

What is the purpose of the change

This PR fixes inconsistent source identity propagation in the distributed schema evolution flow.

In distributed mode, SchemaOperator forwards schema change requests to the coordinator with the original upstream source partition via SchemaChangeRequest(sourcePartition, sinkSubTaskId, schemaChangeEvent).
However, when emitting the downstream FlushEvent, it previously used the current SchemaOperator subtask id instead of the original source partition.

As a result, the same schema change could be identified by different ids in different parts of the pipeline:

  • SchemaChangeRequest / coordinator deduplication use the original source partition
  • FlushEvent / downstream flush handling use the SchemaOperator subtask id

This makes the flush lineage inconsistent with the schema-change lineage in distributed topology. In particular, duplicated broadcast branches of the same upstream schema change cannot be consistently aligned
by the original source partition.

Brief change log

  • Use schemaChangeRequest.getSourceSubTaskId() instead of SchemaOperator's subTaskId when constructing distributed FlushEvent
  • Add a regression test in SchemaEvolveTest to verify that FlushEvent keeps the original source partition even when the SchemaOperator subtask id is different
  • Extend DistributedEventOperatorTestHarness so tests can inject a custom operator subtask index and reliably reproduce this scenario

Verifying this change

This change is verified by unit tests:

  • SchemaEvolveTest#testFlushEventUsesSourcePartitionInsteadOfSchemaOperatorSubtask

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e. is any changed class annotated with @Public(@PublicEvolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no

Documentation

Does this pull request introduce a new feature? no

If yes, how is the feature documented? not applicable

@yuxiqian yuxiqian self-requested a review May 18, 2026 03:30
Copy link
Copy Markdown
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We may add an E2E case later, after we have a distributed source.

@yuxiqian yuxiqian merged commit 3e1211e into apache:master May 18, 2026
34 of 38 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants