Skip to content

[FLINK-36235][Stream] Ignore emitting null rowData when deserialized message fails#124

Open
arvindKandpal-ksolves wants to merge 1 commit into
apache:mainfrom
arvindKandpal-ksolves:FLINK-36235
Open

[FLINK-36235][Stream] Ignore emitting null rowData when deserialized message fails#124
arvindKandpal-ksolves wants to merge 1 commit into
apache:mainfrom
arvindKandpal-ksolves:FLINK-36235

Conversation

@arvindKandpal-ksolves
Copy link
Copy Markdown

Purpose of the change

This PR fixes FLINK-36235.
Currently, when a message fails to deserialize and returns null, the PulsarDeserializationSchemaWrapper emits this null value downstream. This violates Flink's DeserializationSchema contract (where a null return value means "drop this record") and can cause NullPointerExceptions in downstream operators. This PR adds a simple null-check guard to safely drop these corrupted or null records.

Brief change log

  • Added an if (instance != null) check before emitting the record in PulsarDeserializationSchemaWrapper#deserialize.
  • Added a regression test wrapperDropsNullDeserializedRecord and a CountingCollector helper class in PulsarDeserializationSchemaTest to verify that null records are correctly dropped without errors.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests in PulsarDeserializationSchemaTest.java to explicitly test and verify the filtering behavior when the inner deserializer returns null.
  • Verified locally using mvn -pl flink-connector-pulsar test.

Significant changes

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 20, 2026

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@arvindKandpal-ksolves
Copy link
Copy Markdown
Author

Hi @featzhang , Can you review this Patch ?

@featzhang
Copy link
Copy Markdown
Member

The change itself is correct and matches the default deserialize(byte[], Collector) implementation in DeserializationSchema (flink-core), which drops null returns by contract. Two things worth folding into this PR or a follow-up:

  1. The same pattern exists in three other wrappers in the same package and is not addressed here:

    • PulsarSchemaWrapper.java:68
    • PulsarTypeInformationWrapper.java:61
    • GenericRecordDeserializationSchema.java:41

    They share the exact issue (out.collect(instance) without a null check). Since the JIRA describes a contract bug, fixing only one wrapper leaves the same footgun for the other code paths.

  2. Behavior change for the table connector is worth calling out in the PR description. In non-upsert mode, a null value previously reached PulsarRowDataConverter#emitRow and threw DeserializationException("Invalid null value received in non-upsert mode..."). After this PR, the wrapper drops it silently. The new behavior is fine and aligns with Kafka, but it should not be hidden in the changelog — users currently relying on the exception to detect bad data will lose records silently. A LOG.debug on the dropped record would help operability.

}

/** Collector that records every {@link #collect} invocation, including nulls. */
private static class CountingCollector<T> implements Collector<T> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: org.apache.flink.api.common.functions.util.ListCollector would do the same job (collect into a List<T>) without introducing a new test helper. Not blocking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants