Skip to content

Commit 6e0c81d

Browse files
authored
[fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625)
1 parent 2346d0e commit 6e0c81d

File tree

1 file changed

+15
-11
lines changed
  • seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source

1 file changed

+15
-11
lines changed

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -242,17 +242,21 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
242242
Long offset = entry.getValue();
243243
try {
244244
if (messageQueue != null && offset != null) {
245-
consumerThreads
246-
.get(messageQueue)
247-
.getTasks()
248-
.put(
249-
consumer -> {
250-
if (this.metadata.isEnabledCommitCheckpoint()) {
251-
consumer.getOffsetStore()
252-
.updateOffset(messageQueue, offset, false);
253-
consumer.getOffsetStore().persist(messageQueue);
254-
}
255-
});
245+
RocketMqConsumerThread rocketMqConsumerThread =
246+
consumerThreads.get(messageQueue);
247+
if (rocketMqConsumerThread != null) {
248+
rocketMqConsumerThread
249+
.getTasks()
250+
.put(
251+
consumer -> {
252+
if (this.metadata.isEnabledCommitCheckpoint()) {
253+
consumer.getOffsetStore()
254+
.updateOffset(
255+
messageQueue, offset, false);
256+
consumer.getOffsetStore().persist(messageQueue);
257+
}
258+
});
259+
}
256260
}
257261
} catch (InterruptedException e) {
258262
log.error("commit offset failed", e);

0 commit comments

Comments
 (0)