Skip to content

Commit 8da1f3e

Browse files
Skip commit when records offset < initial offset (knative-extensions#4348) (#1756)
* Skip recordNewOffset when records offset < initial offset * Run hack/update-codegen.sh * Make recordReceived a bit safer concurrency-wise * Fix: Remove redundant if clause Co-authored-by: Christoph Stäbler <[email protected]>
1 parent aebf463 commit 8da1f3e

File tree

1 file changed

+16
-3
lines changed
  • data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer

1 file changed

+16
-3
lines changed

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,12 @@ public PartitionRevokedHandler getPartitionRevokedHandler() {
102102
@Override
103103
public void recordReceived(final ConsumerRecord<?, ?> record) {
104104
final var tp = new TopicPartition(record.topic(), record.partition());
105-
if (!offsetTrackers.containsKey(tp)) {
106-
// Initialize offset tracker for the given record's topic/partition.
107-
offsetTrackers.putIfAbsent(tp, new OffsetTracker(record.offset()));
105+
final var offsetTracker = offsetTrackers.putIfAbsent(tp, new OffsetTracker(record.offset()));
106+
if (offsetTracker != null && record.offset() < offsetTracker.initialOffset) {
107+
logger.debug(
108+
"Received records offset ({}) is less than offsetTrackers initial offset ({})",
109+
record.offset(),
110+
offsetTracker.initialOffset);
108111
}
109112
}
110113

@@ -148,6 +151,16 @@ private void commit(final ConsumerRecord<?, ?> record) {
148151
// Note: it's not possible to commit offsets of partitions that this a particular consumer instance doesn't own.
149152
final var ot = this.offsetTrackers.get(new TopicPartition(record.topic(), record.partition()));
150153
if (ot != null) {
154+
if (record.offset() < ot.initialOffset) {
155+
logger.debug(
156+
"Ignoring commit for {}-{} offset {}, offset tracker already on {}",
157+
record.topic(),
158+
record.partition(),
159+
record.offset(),
160+
ot.initialOffset);
161+
return;
162+
}
163+
151164
ot.recordNewOffset(record.offset());
152165
}
153166
}

0 commit comments

Comments
 (0)