Skip to content

Commit f67f272

Browse files
authored
[Bug][Kafka] kafka reads repeatedly (#8465)
1 parent e18bfea commit f67f272

File tree

1 file changed

+3
-3
lines changed
  • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source

1 file changed

+3
-3
lines changed

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ public void emitRecord(
6767
} else {
6868
deserializationSchema.deserialize(consumerRecord.value(), outputCollector);
6969
}
70-
// consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
71-
// for the next run
72-
splitState.setCurrentOffset(consumerRecord.offset() + 1);
7370
} catch (Exception e) {
7471
if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
7572
logger.warn(
@@ -79,6 +76,9 @@ public void emitRecord(
7976
throw e;
8077
}
8178
}
79+
// consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
80+
// for the next run
81+
splitState.setCurrentOffset(consumerRecord.offset() + 1);
8282
}
8383

8484
private static class OutputCollector<T> implements Collector<T> {

0 commit comments

Comments
 (0)