[mqtt] Add portable MqttIO Read/Write transforms (revives #32385)#38493
Draft
tkaymak wants to merge 2 commits into
Draft
[mqtt] Add portable MqttIO Read/Write transforms (revives #32385)#38493tkaymak wants to merge 2 commits into
tkaymak wants to merge 2 commits into
Conversation
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Wires :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the SchemaTransforms are picked up by ExpansionService via @autoservice. Tests cover a read-with-timeout-no-data case and a write-then-read round trip against an embedded ActiveMQ broker. Batch only; the streaming case flagged on PR apache#32385 will be addressed in a follow-up. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds name overrides for the new MqttIO SchemaTransforms in standard_expansion_services.yaml so the generated Python wrappers follow the kafka-style naming (ReadFromMqtt / WriteToMqtt) and become available under apache_beam.io. Regenerates standard_external_transforms.yaml via :sdks:python:generateExternalTransformsConfig — the file now includes the mqtt_read:v1 and mqtt_write:v1 entries with their inferred Row schema for ConnectionConfiguration. Adds an I/Os entry in CHANGES.md for the upcoming 2.74.0 release.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Revives the approved diff from PR #32385 (
Add portable Mqtt source and sink transforms) and wires the new SchemaTransforms into Python cross-language wrapper generation.After this lands, Python users can do:
and reach
MqttIOover xlang, the same wayReadFromKafka/WriteToKafkawork today.How
Two commits:
[mqtt] Add SchemaTransform providers for MqttIO Read/WriteMqttIO.ConnectionConfigurationwith@DefaultSchema(AutoValueSchema.class)+@SchemaFieldDescriptionso it round-trips through Beam Schemas.MqttReadSchemaTransformProvider(beam:schematransform:org.apache.beam:mqtt_read:v1) andMqttWriteSchemaTransformProvider(beam:schematransform:org.apache.beam:mqtt_write:v1), both@AutoService-registered.MqttSchemaTransformProviderTestcovering the read-with-timeout-no-data case and a write-then-read round trip via an embedded ActiveMQ broker.:sdks:java:io:mqttinto:sdks:java:io:expansion-serviceso the providers are discoverable byExpansionService.[mqtt] Wire MqttIO into Python xlang wrapper generationsdks/standard_expansion_services.yamlso the generated wrappers use kafka-style naming (ReadFromMqtt/WriteToMqtt).sdks/standard_external_transforms.yamlvia:sdks:python:generateExternalTransformsConfig.CHANGES.mdfor 2.74.0.Notes vs. PR #32385
MqttIO#32668 API:MqttIO.Read<byte[]>/MqttIO.Write<byte[]>instead of the raw types in the original PR.ReadFromMqtt/WriteToMqtt(kafka-style) instead of the auto-derivedMqttRead/MqttWrite. Per @Abacn's roadmap comment about onboarding throughstandard_expansion_services.yaml.standard_external_transforms.yamlshape changed slightly (fieldsis now a list and Python types have a more compact representation) becausegen_xlang_wrappers.pyevolved since 2024-08. Our regenerated diff follows current master's format.topicis nowOptionalin the generated schema becauseMqttIO.ConnectionConfiguration#getTopic()was made@Nullableby PR #32668 (readWithMetadata).Scope
Batch only. The streaming-mode failure that @twosom flagged on the original PR (comment — batch worked, streaming did not) was never root-caused. That investigation is intentionally out of scope here and will be addressed in a dedicated follow-up PR.
Credits
Original work by @ahmedabu98 and @twosom on PR #32385; @damondouglas approved that PR before it went stale and auto-closed on 2025-10-14. This revives that change with the small adjustments above.
Verification
GRADLE_USER_HOME=/tmp/.gradle ./gradlew \ :sdks:java:io:mqtt:test \ :sdks:java:io:expansion-service:build \ :validateChangesAll pass locally.
Closes the gap from #32385 / addresses #21060 (Python MQTT IO).