Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG FIX : fix-ReadFromKafkaViaSDF-bug-shall-set-coder #34505

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

yyfhust
Copy link
Contributor

@yyfhust yyfhust commented Apr 2, 2025

Fix for #34506

This PR fixes a bug when using KafkaIO with a customized Deserializer that extends Deserializer<Row>. We built this Deserializer to deserialize byte arrays into Beam Rows with a schema we specified.

When using KafkaIO, we use withValueDeserializerAndCoder.

It runs without any issues with legacy Kafka IO ReadFromKafkaViaUnbounded. However, it fails when using ReadFromKafkaViaSDF. The error originates here.

Screenshot 2025-04-01 at 9 05 09 PM

This happens because currently ReadFromKafkaViaSDF does not set the coder even if we explicitly provide both the deserializer and the coder using withValueDeserializerAndCoder. Since no coder is explicitly set, Beam infers the type from the deserializer (

private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: Preconditions.checkStateNotNull(getValueDeserializerProvider()).getCoder(coderRegistry);
). This is typically not an issue when using Beam's built-in deserializers:

Screenshot 2025-04-01 at 11 12 48 PM

However, if we use a customized deserializer, such as foo_bar_Deserializer implements Deserializer<Row>, Beam will be unable to infer the coder and will throw an error.

  • Legacy KafkaIO (ReadFromKafkaViaUnbounded) sets both the deserializer and the coder based on the input.
  • KafkaIO implemented using SDF (ReadFromKafkaViaSDF) currently does not set the coder explicitly. It does not pass the coder and use it, instead relying on inferring the coder from the Deserializer, which will throw an error as there is no coder for Row in the registry for obvious reasons.

To resolve this issue, we need to explicitly set the coder based on the input.

At the entry of KafkaIO, coders are resolved here:

Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
(either from user input with withValueDeserializerAndCoder, or Beam infers the coder based on type from coderRegistry).

This coder is passed to both ReadFromKafkaViaUnbounded and ReadFromKafkaViaSDF.

  • ReadFromKafkaViaUnbounded uses the coder
  • ReadFromKafkaViaSDF does not use the coder

This PR addresses the issue for ReadFromKafkaViaSDF.



Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@yyfhust yyfhust marked this pull request as draft April 2, 2025 00:59
@yyfhust yyfhust changed the title <Draft> fix-ReadFromKafkaViaSDF-bug-shall-set-coder <WIP> fix-ReadFromKafkaViaSDF-bug-shall-set-coder Apr 2, 2025
@yyfhust yyfhust changed the title <WIP> fix-ReadFromKafkaViaSDF-bug-shall-set-coder <WIP> BUG FIX : fix-ReadFromKafkaViaSDF-bug-shall-set-coder Apr 2, 2025
@yyfhust yyfhust changed the title <WIP> BUG FIX : fix-ReadFromKafkaViaSDF-bug-shall-set-coder BUG FIX : fix-ReadFromKafkaViaSDF-bug-shall-set-coder Apr 2, 2025
@yyfhust yyfhust marked this pull request as ready for review April 2, 2025 02:59
Copy link
Contributor

github-actions bot commented Apr 2, 2025

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @chamikaramj for label java.
R: @Abacn for label build.
R: @sjvanrossum for label kafka.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Comment on lines +1787 to +1790
.withKeyDeserializerProviderAndCoder(
kafkaRead.getKeyDeserializerProvider(), keyCoder)
.withValueDeserializerProviderAndCoder(
kafkaRead.getValueDeserializerProvider(), valueCoder)
Copy link
Contributor Author

@yyfhust yyfhust Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyCoder and valueCoder must be non-nullable. they are resolved here

Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);

either from user input or infer from coderegistry

Copy link
Contributor Author

@yyfhust yyfhust Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is at the entry of kafka IO, and it will 100% return a coder :

private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: Preconditions.checkStateNotNull(getValueDeserializerProvider()).getCoder(coderRegistry);

if coder is given, it will return the coder specified by user. If not, it will attempt to retrieve a coder from registry , which only has coder for build-in deserializer.

@liferoad liferoad requested review from scwhittle and johnjcasey April 2, 2025 16:42
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Would it be possible to extend the test coverage? I think you could modify a simpler test like testKafkaIOWriteWithErrorHandler in KafkaIOIT.java to have a custom deserializer (perhaps just a no-op subclass of StringSerializer for example) without a registered coder. That would help prevent regressions as well.

@yyfhust
Copy link
Contributor Author

yyfhust commented Apr 3, 2025

Thanks! Would it be possible to extend the test coverage? I think you could modify a simpler test like testKafkaIOWriteWithErrorHandler in KafkaIOIT.java to have a custom deserializer (perhaps just a no-op subclass of StringSerializer for example) without a registered coder. That would help prevent regressions as well.

Yes, will do. Actually I was waiting for someone to point me to the place where I can write unit/integration tests 🤣 . Thanks. 🙇‍♂️

@scwhittle
Copy link
Contributor

scwhittle commented Apr 3, 2025 via email

@yyfhust
Copy link
Contributor Author

yyfhust commented Apr 3, 2025

stop reviewer notifications

Copy link
Contributor

github-actions bot commented Apr 3, 2025

Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment assign set of reviewers

@yyfhust
Copy link
Contributor Author

yyfhust commented Apr 3, 2025

I think they are still there just grouped together differently.

On Thu, Apr 3, 2025, 4:23 PM johnjcasey @.> wrote: This change is technically not backwards compatible, because it removes the old, but public facing, methods on ReadSourceDescriptors. Can you keep those methods? — Reply to this email directly, view it on GitHub <#34505 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABBZZTHDTLTLNN4CEW6GX7D2XU75PAVCNFSM6AAAAAB2IH2E76VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDONZVHE3DINRSGM . You are receiving this because your review was requested.Message ID: @.> [image: johnjcasey]johnjcasey left a comment (apache/beam#34505) <#34505 (comment)> This change is technically not backwards compatible, because it removes the old, but public facing, methods on ReadSourceDescriptors. Can you keep those methods? — Reply to this email directly, view it on GitHub <#34505 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABBZZTHDTLTLNN4CEW6GX7D2XU75PAVCNFSM6AAAAAB2IH2E76VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDONZVHE3DINRSGM . You are receiving this because your review was requested.Message ID: @.***>

sorry i do not get what do you mean here ; would you mind giving more details @scwhittle 🙇‍♂️ Thanks!!

@yyfhust
Copy link
Contributor Author

yyfhust commented Apr 3, 2025

ssign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants