-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Fix][Connector-V2] Fix RocketMQ parallism read on work well #9777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
|
Hi, The CI is failing due to a formatting issue (Spotless check). |
|
cc @Carl-Zhou-CN as well. |
|
@xxyykkxx hi, it seems that the changes don't just work on the flink engine |
yes,just because this issue can be intuitively identified through Flink monitoring |
I changed the title. Please continue review this PR @Carl-Zhou-CN . Thanks. |
Sure, this is what I should do |
| Supplier<RocketMQPartitionSplitReader> splitReader = | ||
| () -> new RocketMQPartitionSplitReader(this.metadata, readerContext); | ||
|
|
||
| RocketMQSourceFetcherManager kafkaSourceFetcherManager = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| RocketMQSourceFetcherManager kafkaSourceFetcherManager = | |
| RocketMQSourceFetcherManager rocketMQSourceFetcherManager = |
|
|
||
| return new RocketMqSourceReader( | ||
| elementsQueue, | ||
| kafkaSourceFetcherManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| kafkaSourceFetcherManager, | |
| rocketMQSourceFetcherManager, |
| outputCollector.output = collector; | ||
| reportMetrics(consumerRecord); | ||
| deserializationSchema.deserialize(consumerRecord.getBody(), outputCollector); | ||
| // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset | |
| // consumerRecord.offset + 1 is the offset commit to RocketMQ and also the start offset |
| return recordsBySplits; | ||
| } | ||
| try { | ||
| messageExts = consumer.poll(pollTimeOut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a parameter to control pollTimeOut?
|
|
||
| /** | ||
| * @author 02211659 bianxiang | ||
| * @date 2025-08-19 10:06:05 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete,we have git
| public class RocketMqRecordEmitter | ||
| implements RecordEmitter<MessageExt, SeaTunnelRow, RocketMQPartitionSplitState> { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(RocketMqRecordEmitter.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
| // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many | ||
| // `AsyncAppender-Dispatcher-Thread` | ||
| System.setProperty("rocketmq.client.logUseSlf4j", "true"); | ||
| initialRocketMQConsumer(config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better if we reused the implementation within RocketMqAdminUtil here?
| Thread thread = new Thread(runnable); | ||
| thread.setDaemon(true); | ||
| thread.setName("RocketMq-messageQueue-dynamic-discovery"); | ||
| thread.setName("kafka-partition-dynamic-discovery"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming errors
| // initialization scenario | ||
| if (offset <= -2) { | ||
| throw new RuntimeException( | ||
| "An error occurred while fetching offset,please check up server's log"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will continue to maintain RocketMqConnectorException better?
| break; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| "current startMode is not supported" + metadata.getStartMode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 7 days if no further activity occurs. |
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide