-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port x-lang support to kinesis v2 #25416
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
@xinbinhuang Many thanks for working on this! Ping @chamikaramj to help with x-lang questions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xinbinhuang !
@chamikaramj can you provide more details? Pls correct me if I'm wrong below.
do I need to add the expand service gradle config: https://github.com/apache/beam/tree/master/sdks/java/io/kinesis/expansion-service?
Yes, the expansion service is needed to run the Java side.
What would be the proper input type to KinesisIO.Write? I'm not sure how Python type translate to x-lang transforms. I hard-coded it to byte[] for now.
byte[]
seems ok for both reading and writing. Though, that makes it hard to use decent, content based partitioning (see comment on random partitioning). For writing a tuple type such as KV
would make a lot more sense. That allows users to pass a partition key. Have a look at the external KafkaIO as example.
Similarly, the record metadata might be required for certain use cases when reading. As far as I know hat requires a schema for KinesisRecord, but I'm not sure of the contracts / how this is supposed to be done. In any case, just returning the binary data seems to be ok in the read case.
Why does KinesisIO implements its own KinesisIO.Write.Result instead of using the provided PDone? They looks the same in terms of implementation.
This is a Beam convention improve evolution of write APIs:
The top-level I/O class will provide a static method to start constructing an I/O.Write
transform. This returns a PTransform with a single input PCollection, and a Write.Result
output.
For instance this allows to add more advanced error handling, e.g. adding a mode that returns failed records rather than failing fast.
Because some fields are removed (i.e. partition_key, verify_certificate), and mechanisms are changes (i.e. partitioning). What's the general policy for beam to introduce breaking changes? Do we just break it at the code-level and release it with major version? Or do we introduce a v2 version and deprecate the old one to allow gradual transition.
This is a tough one... I suppose it would be possible to silently switch the implementation on the Java side carefully setting defaults and ignoring obsolete settings (such as the old partitioning key) with appropriate warnings added. On the other hand, if types are also changed it's impossible to not break anything. That would require to add a v2, deprecate the previous one and remove it after 3+ releases.
configuration.awsSecretKey, | ||
configuration.region, | ||
configuration.serviceEndpoint)) | ||
.withPartitioner(KinesisPartitioner.explicitRandomPartitioner(configuration.shards)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what strategy is best here. Using the explicitRandomPartitioner only works well if the number of shards is fixed, if shards get split the new ones wouldn't receive any data. On the other hand, a completely random strategy (generating new random partition keys each time) breaks record aggregation. In case re-sharding is (potentially) used, users should probably configure shards
as the max number of expected shards.
String streamName; | ||
@Nullable String awsAccessKey; | ||
@Nullable String awsSecretKey; | ||
Region region; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Region can be nullable as well, it will try to fallback to the AWS default region of the environment
this.region = Region.of(region); | ||
} | ||
|
||
public void setServiceEndpoint(@Nullable String serviceEndpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The param shouldn't be nullable, that would cause a NPE
@Override | ||
public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal( | ||
Configuration configuration) { | ||
KinesisIO.Write<byte[]> writeTransform = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writer requires a serializer, in this case a trivial one: byte[] -> byte[]
} | ||
|
||
@Override | ||
public PTransform<PBegin, PCollection<KinesisRecord>> buildExternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output needs to be byte[]
. You can just add a MapElements
to extract data
from the record.
Kind ping @xinbinhuang |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
Hi @xinbinhuang, kind ping on this |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Please add a meaningful description for your change here
cc: @aromanenko-dev @mosche
fixes: #23570
I've implemented the Java part and have some questions wanna clear before adding the Python part:
Java Questions:
KinesisIO.Write
? I'm not sure how Python type translate to x-lang transforms.. I hard-coded it tobyte[]
for now.KinesisIO
implements its ownKinesisIO.Write.Result
instead of using the providedPDone
? They looks the same in terms of implementation.Python questions:
partition_key
,verify_certificate
), and mechanisms are changes (i.e. partitioning). What's the general policy for beam to introduce breaking changes? Do we just break it at the code-level and release it with major version? Or do we introduce a v2 version and deprecate the old one to allow gradual transition.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.