Skip to content

[improve] rocketmq options #9251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

liunaijie
Copy link
Member

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@nielifeng nielifeng requested a review from Copilot April 30, 2025 02:34
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request refactors RocketMQ connectors to use new configuration options (RocketMqSourceOptions and RocketMqSinkOptions) and aligns the source/sink implementations with the updated table API. Key changes include replacing legacy configuration keys with the new options, refactoring preparation methods using ReadonlyConfig, and adding createSource/createSink methods for better integration with the table connector APIs.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.

Show a summary per file
File Description
connector-rocketmq/source/RocketMqSourceFactory.java Updated to use RocketMqSourceOptions and added createSource implementation.
connector-rocketmq/source/RocketMqSource.java Refactored config validation, initialization, and deserialization setup using ReadonlyConfig.
connector-rocketmq/sink/RocketMqSinkFactory.java Updated to use RocketMqSinkOptions and introduced createSink method.
connector-rocketmq/sink/RocketMqSink.java Refactored initialization logic to use ReadonlyConfig and CatalogTable; streamlined producer metadata configuration.
connector-rocketmq/config/RocketMqSourceOptions.java Migrated legacy consumer configuration to RocketMqSourceOptions with improved defaults.
connector-rocketmq/config/RocketMqSinkOptions.java Migrated legacy producer configuration to RocketMqSinkOptions and set a default producer group.
connector-rocketmq/config/RocketMqBaseOptions.java Consolidated common configuration options and set defaults for field delimiter and format.
seatunnel-ci-tools/ConnectorOptionCheckTest.java Removed RocketMqSinkOptions and RocketMqSourceOptions from the whitelist, indicating a change in test coverage expectations.
Comments suppressed due to low confidence (3)

seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java:63

  • If the configuration value for FORMAT is not provided in uppercase, SchemaFormat.valueOf() may throw an exception. Consider normalizing the input (e.g., using toUpperCase()) or validating the config value to ensure it matches the enum naming.
producerMetadata.setFormat(SchemaFormat.valueOf(pluginConfig.get(RocketMqSinkOptions.FORMAT)));

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java:203

  • [nitpick] The removal of RocketMqSinkOptions from the whitelist should be intentionally verified to ensure that these options are now covered by tests. Please add a clarifying comment or update the test documentation if this removal is expected.
whiteList.add("RocketMqSinkOptions");

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java:205

  • [nitpick] Ensure that the removal of RocketMqSourceOptions from the whitelist is intentional and that configuration validations for these options are adequately covered by other tests. Adding a note for future maintainers is recommended.
whiteList.add("RocketMqSourceOptions");

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant