Skip to content

[FLINK-39697] Bump Flink version to 2.2.1#123

Open
vernedeng wants to merge 2 commits into
apache:mainfrom
vernedeng:feature/upgrade-flink-2.2.1
Open

[FLINK-39697] Bump Flink version to 2.2.1#123
vernedeng wants to merge 2 commits into
apache:mainfrom
vernedeng:feature/upgrade-flink-2.2.1

Conversation

@vernedeng
Copy link
Copy Markdown

Purpose of the change

Upgrade Flink dependency from 1.20.3 to 2.2.1, adapting the Pulsar connector to Flink 2.x breaking API changes while preserving existing behavior and test coverage.

Brief change log

  • Upgrade flink.version from 1.20.3 to 2.2.1.
  • Migrate Sink API: TwoPhaseCommittingSinkSink + SupportsCommitter; PrecommittingSinkWriterCommittingSinkWriter; InitContextWriterInitContext.
  • Migrate Source API: remove FutureCompletingBlockingQueue from PulsarSourceFetcherManager and PulsarSourceReader (now managed internally by framework).
  • Migrate Serialization API: ExecutionConfigSerializerConfig in TypeInformation.createSerializer(), PulsarSourceBuilder, and PulsarTypeInformationWrapper.
  • Migrate TypeSerializerSnapshot.resolveSchemaCompatibility() to new signature.
  • Migrate PulsarSinkContextImpl to use WriterInitContext.getTaskInfo() for subtask metadata.
  • Add TableDataTypeUtils to replace removed DataTypeUtils.stripRowPrefix() and renameRowFields().
  • Migrate test classes to use legacy.SourceFunction and legacy.SinkFunction package paths.
  • Migrate PulsarWriterTest.MockInitContext to implement WriterInitContext with TaskInfo interface.
  • Remove Scala (scala-reflect, scala-library) and Kryo dependencies (no longer used in Flink 2.x).
  • Exclude flink-table-planner-loader from flink-table-test-utils to fix executor instantiation conflict.
  • Use ${scala.binary.version} property for flink-table-planner artifact references.
  • Skip japicmp check (no valid reference version for first Flink 2.x-based release).
  • Update CI matrix to test against Flink 2.2.1.

Verifying this change

This change is already covered by existing tests:

  • PulsarSinkITCase verifies sink functionality with all DeliveryGuarantee modes (NONE, AT_LEAST_ONCE, EXACTLY_ONCE).
  • PulsarTableITCase verifies Table API source/sink with multiple formats (JSON, Avro, CSV).
  • PulsarTableOptionsTest verifies table option validation logic.
  • PulsarWriterTest verifies writer unit behavior with mocked contexts.
  • PulsarSourceReader and PulsarSourceFetcherManager changes are covered by existing source integration tests.

Significant changes

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 18, 2026

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Copy Markdown
Member

@featzhang featzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I've left some comments on specific lines.

<version>${flink.version}</version>
<scope>test</scope>
</dependency>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate flink-table-planner dependency. Remove one.

*
* @param dataType The row data type whose field names should be stripped.
* @param prefix The prefix to remove from each field name.
* @return A new data type with the prefix removed from field names.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New utility class lacks unit tests. Add TableDataTypeUtilsTest to verify stripRowPrefix() and renameRowFields().

public PulsarSourceFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier,
Configuration configuration) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor signature change is correct for Flink 2.x. Consider adding Javadoc to note the elementsQueue parameter removal.

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always returning compatibleAsIs() may be too permissive. Document the serializer compatibility assumptions or add validation logic.

Comment thread pom.xml

<japicmp.skip>false</japicmp.skip>
<japicmp.referenceVersion>3.0.0-1.16</japicmp.referenceVersion>
<japicmp.skip>true</japicmp.skip>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

japicmp.skip=true is correct for the first Flink 2.x release. Re-enable in subsequent versions to track API compatibility.

matrix:
flink: [ 1.20.3 ]
jdk: [ '8, 11, 17' ]
flink: [ 2.2.1 ]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDK 8 removed correctly per Flink 2.x requirements. Update README.md to document minimum JDK 11 requirement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants