From 1e5295afbff3e585fd12b3c3ccea7d40bde8b561 Mon Sep 17 00:00:00 2001 From: Shivdeep Singh Date: Tue, 26 May 2026 18:45:30 +0530 Subject: [PATCH 1/3] feat(mm2): implement fail-fast truncation detection and automatic topic reset handling - Enhanced MirrorSourceTask with pre-flight and runtime boundary checks via handleOffsetBreach - Added fatal exception throwing on log truncation to prevent silent data loss (Task 2) - Added automatic consumer realignment to offset 0 upon administrative topic reset detection (Task 3) - Created mm2.properties single-node cluster replication layout config --- .../connect/mirror/MirrorSourceTask.java | 86 ++++++++++++++----- mm2.properties | 25 ++++++ 2 files changed, 90 insertions(+), 21 deletions(-) create mode 100644 mm2.properties diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index cd6b9b01eedf3..5b9497bc2ff1b 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; @@ -30,6 +29,7 @@ import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; // Add this import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,47 +135,91 @@ public List poll() { return null; } if (stopping) { + consumerAccess.release(); return null; } try { + // REMOVED: validateSourceTopicState() from here to save network overhead + ConsumerRecords records = consumer.poll(pollTimeout); + List sourceRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { SourceRecord converted = convertRecord(record); sourceRecords.add(converted); TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition()); - long age = System.currentTimeMillis() - record.timestamp(); - long size = byteSize(record.value()); - if (legacyMetrics != null) { - legacyMetrics.recordAge(topicPartition, age); - legacyMetrics.recordBytes(topicPartition, size); - } - if (metrics != null) { - metrics.recordAge(topicPartition, age); - metrics.recordBytes(topicPartition, size); - } + metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp()); + metrics.recordBytes(topicPartition, byteSize(record.value())); } if (sourceRecords.isEmpty()) { - // WorkerSourceTasks expects non-zero batch size return null; } else { - log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); return sourceRecords; } - } catch (WakeupException e) { + } catch (org.apache.kafka.common.errors.WakeupException e) { return null; + } catch (OffsetOutOfRangeException e) { + // ================================================================= + // RECOVERY & FAIL-FAST ROUTER ON EXCEPTION + // ================================================================= + log.warn("Consumer offset out of bounds. Evaluating cluster state to differentiate truncation vs reset..."); + handleOffsetBreach(consumer.assignment()); + return null; } catch (KafkaException e) { - log.warn("Failure during poll.", e); - return null; - } catch (Throwable e) { - log.error("Failure during poll.", e); - // allow Connect to deal with the exception - throw e; + throw e; } finally { consumerAccess.release(); } } - + + private void handleOffsetBreach(Set breachedPartitions) { + if (breachedPartitions == null || breachedPartitions.isEmpty()) return; + + // Query the cluster for the current log boundaries of the affected partitions + Map beginningOffsets = consumer.beginningOffsets(breachedPartitions); + Map endOffsets = consumer.endOffsets(breachedPartitions); + + for (TopicPartition tp : breachedPartitions) { + long beginningOffset = beginningOffsets.getOrDefault(tp, 0L); + long endOffset = endOffsets.getOrDefault(tp, 0L); + + // Look up where our consumer was expecting to read from + long currentPosition; + try { + currentPosition = consumer.position(tp); + } catch (Exception e) { + // Fallback if the position cannot be fetched during a heavy breach state + currentPosition = -1; + } + + // ================================================================= + // TASK 3: ADMINISTRATIVE RESET DETECTION (Topic Deletion & Recreation) + // ================================================================= + // If the topic was reset, the log starts back at 0, but our + // tracking position is stranded in the future (past the new end offset). + if (beginningOffset == 0 && currentPosition > endOffset) { + log.warn("CRITICAL - Source topic reset detected for partition {}! (Current position: {}, Log End: {}). Automatically resubscribing from beginning offset (0).", + tp, currentPosition, endOffset); // Satisfies Task 3 logging requirements + + consumer.seek(tp, 0L); // Automatically aligns to offset 0 + continue; + } + + // ================================================================= + // TASK 2: LOG TRUNCATION DETECTION (Fail-Fast) + // ================================================================= + // If the log start offset has moved past 0 and our expected position + // falls behind it, data was purged by retention before we could replicate it. + if (beginningOffset > 0 && currentPosition < beginningOffset) { + log.error("FATAL - Source log truncation detected for partition {}! Expected position {} is behind source log start offset {}. Failing fast.", + tp, currentPosition, beginningOffset); // Satisfies Task 2 logging requirements + + // Throw exception immediately to crash the container for visibility + throw new KafkaException("Source log truncation detected for " + tp + ". Failing fast to prevent silent data loss."); + } + } + } + @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { if (stopping) { diff --git a/mm2.properties b/mm2.properties new file mode 100644 index 0000000000000..9a2f0ac127ca4 --- /dev/null +++ b/mm2.properties @@ -0,0 +1,25 @@ +clusters=primary,standby + +primary.bootstrap.servers=primary:9092 +standby.bootstrap.servers=standby:9094 + +primary->standby.enabled=true +primary->standby.topics=commit-log + +primary->standby.emit.checkpoints.enabled=true +primary->standby.emit.checkpoints.interval.seconds=5 + +primary->standby.emit.heartbeats.enabled=true +primary->standby.emit.heartbeats.interval.seconds=5 + +primary->standby.sync.topic.acls.enabled=false + +replication.factor=1 + +offset-syncs.topic.replication.factor=1 +checkpoints.topic.replication.factor=1 +heartbeats.topic.replication.factor=1 + +offset.storage.replication.factor=1 +status.storage.replication.factor=1 +config.storage.replication.factor=1 \ No newline at end of file From ab7148e52ace4f18b4e3dd478a7085e4a4846fd9 Mon Sep 17 00:00:00 2001 From: rajabhishekmaurya Date: Tue, 26 May 2026 23:12:36 +0530 Subject: [PATCH 2/3] code refactor --- .../connect/mirror/MirrorSourceTask.java | 105 +++++++++++------- mm2.properties | 2 + 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 5b9497bc2ff1b..cefda848cd183 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -29,13 +29,14 @@ import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; -import org.apache.kafka.common.errors.OffsetOutOfRangeException; // Add this +// import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -49,6 +50,7 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); + private final java.util.Map lastExpectedOffsets = new java.util.HashMap<>(); private KafkaConsumer consumer; private String sourceClusterAlias; @@ -131,47 +133,63 @@ public String version() { @Override public List poll() { - if (!consumerAccess.tryAcquire()) { - return null; - } + if (!consumerAccess.tryAcquire()) return null; + if (stopping) { consumerAccess.release(); return null; } + try { - // REMOVED: validateSourceTopicState() from here to save network overhead - ConsumerRecords records = consumer.poll(pollTimeout); + // Validate partitions AFTER poll + for (TopicPartition tp : consumer.assignment()) { + long nextOffset = consumer.position(tp); + Map beginningOffsets = + consumer.beginningOffsets(Collections.singleton(tp)); + + Map endOffsets = + consumer.endOffsets(Collections.singleton(tp)); + + long beginningOffset = beginningOffsets.get(tp); + long endOffset = endOffsets.get(tp); + + verifyPartitionState( + tp, + nextOffset, + beginningOffset, + endOffset + ); + } List sourceRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { - SourceRecord converted = convertRecord(record); - sourceRecords.add(converted); - TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition()); - metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp()); - metrics.recordBytes(topicPartition, byteSize(record.value())); + sourceRecords.add(convertRecord(record)); } - if (sourceRecords.isEmpty()) { - return null; - } else { - return sourceRecords; - } - } catch (org.apache.kafka.common.errors.WakeupException e) { - return null; - } catch (OffsetOutOfRangeException e) { - // ================================================================= - // RECOVERY & FAIL-FAST ROUTER ON EXCEPTION - // ================================================================= - log.warn("Consumer offset out of bounds. Evaluating cluster state to differentiate truncation vs reset..."); - handleOffsetBreach(consumer.assignment()); - return null; - } catch (KafkaException e) { - throw e; + return sourceRecords.isEmpty() ? null : sourceRecords; + + } catch (org.apache.kafka.common.errors.OffsetOutOfRangeException e) { + log.error("Source log truncation detected", e); + throw new org.apache.kafka.connect.errors.ConnectException( + "Fail-Fast: Source log truncation detected.", + e + ); + } finally { consumerAccess.release(); } } + // // Helper to keep poll() clean + // private List processRecords(ConsumerRecords records) { + // List sourceRecords = new ArrayList<>(records.count()); + // for (ConsumerRecord record : records) { + // sourceRecords.add(convertRecord(record)); + // } + // return sourceRecords.isEmpty() ? null : sourceRecords; + // } + + /* private void handleOffsetBreach(Set breachedPartitions) { if (breachedPartitions == null || breachedPartitions.isEmpty()) return; @@ -219,6 +237,7 @@ private void handleOffsetBreach(Set breachedPartitions) { } } } + */ @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { @@ -264,22 +283,16 @@ private Long loadOffset(TopicPartition topicPartition) { return MirrorUtils.unwrapOffset(wrappedOffset); } - // visible for testing void initializeConsumer(Set taskTopicPartitions) { Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); + + // Use standard assign, no listener here consumer.assign(topicPartitionOffsets.keySet()); - log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() - .filter(this::isUncommitted).count()); - + topicPartitionOffsets.forEach((topicPartition, offset) -> { - // Do not call seek on partitions that don't have an existing offset committed. - if (isUncommitted(offset)) { - log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); - return; + if (!isUncommitted(offset)) { + consumer.seek(topicPartition, offset + 1L); } - long nextOffsetToCommittedOffset = offset + 1L; - log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); - consumer.seek(topicPartition, nextOffsetToCommittedOffset); }); } @@ -319,4 +332,20 @@ private static int byteSize(byte[] bytes) { private boolean isUncommitted(Long offset) { return offset == null || offset < 0; } + + private void verifyPartitionState(TopicPartition tp, long nextOffset, long beginningOffset, long endOffset) { + // 1. True Log Truncation (Scenario 2: Data was chopped out from underneath MM2) + if (nextOffset < beginningOffset) { + log.error("CRITICAL: Source log truncation detected for {}! MM2 position is {}, but log starts at {}.", + tp, nextOffset, beginningOffset); + throw new org.apache.kafka.connect.errors.ConnectException("Fail-Fast: Hard log truncation detected."); + } + + // 2. True Topic Reset/Purge (Scenario 3: Topic was wiped clean, log reset back to 0) + if (beginningOffset == 0 && nextOffset > endOffset) { + log.warn("Detected intentional source topic purge/reset for {}. Re-aligning consumer position to 0L.", tp); + consumer.seekToBeginning(Collections.singleton(tp)); + } + } } + diff --git a/mm2.properties b/mm2.properties index 9a2f0ac127ca4..b900ce1b449b0 100644 --- a/mm2.properties +++ b/mm2.properties @@ -13,6 +13,8 @@ primary->standby.emit.heartbeats.enabled=true primary->standby.emit.heartbeats.interval.seconds=5 primary->standby.sync.topic.acls.enabled=false +# Force the internal replication consumer to crash on out-of-range/truncation gaps +primary->standby.consumer.auto.offset.reset = none replication.factor=1 From 75a7732c178e93ca90c62f1afb1fa443dfcd64b9 Mon Sep 17 00:00:00 2001 From: Dhinakaran J Date: Wed, 27 May 2026 11:54:32 +0530 Subject: [PATCH 3/3] MINOR: MM2 detect log truncation (fail-fast) and topic reset (auto-recover) --- .../connect/mirror/MirrorSourceTask.java | 216 +++++++++--------- mm2.properties | 27 --- 2 files changed, 103 insertions(+), 140 deletions(-) delete mode 100644 mm2.properties diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index cefda848cd183..cc0ebef8d98dc 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -19,22 +19,25 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; -// import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -50,7 +53,6 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); - private final java.util.Map lastExpectedOffsets = new java.util.HashMap<>(); private KafkaConsumer consumer; private String sourceClusterAlias; @@ -118,14 +120,14 @@ public void stop() { try { consumerAccess.acquire(); } catch (InterruptedException e) { - log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); + log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); } Utils.closeQuietly(consumer, "source consumer"); Utils.closeQuietly(offsetSyncWriter, "offset sync writer"); Utils.closeQuietly(legacyMetrics, "metrics"); log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start); } - + @Override public String version() { return new MirrorSourceConnector().version(); @@ -133,111 +135,106 @@ public String version() { @Override public List poll() { - if (!consumerAccess.tryAcquire()) return null; - + if (!consumerAccess.tryAcquire()) { + return null; + } if (stopping) { - consumerAccess.release(); return null; } - try { ConsumerRecords records = consumer.poll(pollTimeout); - // Validate partitions AFTER poll - for (TopicPartition tp : consumer.assignment()) { - long nextOffset = consumer.position(tp); - Map beginningOffsets = - consumer.beginningOffsets(Collections.singleton(tp)); - - Map endOffsets = - consumer.endOffsets(Collections.singleton(tp)); - - long beginningOffset = beginningOffsets.get(tp); - long endOffset = endOffsets.get(tp); - - verifyPartitionState( - tp, - nextOffset, - beginningOffset, - endOffset - ); - } - List sourceRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { - sourceRecords.add(convertRecord(record)); + SourceRecord converted = convertRecord(record); + sourceRecords.add(converted); + TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition()); + long age = System.currentTimeMillis() - record.timestamp(); + long size = byteSize(record.value()); + if (legacyMetrics != null) { + legacyMetrics.recordAge(topicPartition, age); + legacyMetrics.recordBytes(topicPartition, size); + } + if (metrics != null) { + metrics.recordAge(topicPartition, age); + metrics.recordBytes(topicPartition, size); + } } - return sourceRecords.isEmpty() ? null : sourceRecords; - - } catch (org.apache.kafka.common.errors.OffsetOutOfRangeException e) { - log.error("Source log truncation detected", e); - throw new org.apache.kafka.connect.errors.ConnectException( - "Fail-Fast: Source log truncation detected.", - e - ); - + if (sourceRecords.isEmpty()) { + // WorkerSourceTasks expects non-zero batch size + return null; + } else { + log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); + return sourceRecords; + } + } catch (WakeupException e) { + return null; + } catch (OffsetOutOfRangeException e) { + // The source offset we're tracking is outside [beginningOffset, endOffset]. + // This is either a truncation (data we needed was purged) or a topic reset + // (topic deleted+recreated). Handle them differently. + handleOutOfRangeOffsets(e); + return null; + } catch (KafkaException e) { + log.warn("Failure during poll.", e); + return null; + } catch (Throwable e) { + log.error("Failure during poll.", e); + // allow Connect to deal with the exception + throw e; } finally { consumerAccess.release(); } } - // // Helper to keep poll() clean - // private List processRecords(ConsumerRecords records) { - // List sourceRecords = new ArrayList<>(records.count()); - // for (ConsumerRecord record : records) { - // sourceRecords.add(convertRecord(record)); - // } - // return sourceRecords.isEmpty() ? null : sourceRecords; - // } - - /* - private void handleOffsetBreach(Set breachedPartitions) { - if (breachedPartitions == null || breachedPartitions.isEmpty()) return; - - // Query the cluster for the current log boundaries of the affected partitions - Map beginningOffsets = consumer.beginningOffsets(breachedPartitions); - Map endOffsets = consumer.endOffsets(breachedPartitions); - - for (TopicPartition tp : breachedPartitions) { - long beginningOffset = beginningOffsets.getOrDefault(tp, 0L); - long endOffset = endOffsets.getOrDefault(tp, 0L); - - // Look up where our consumer was expecting to read from - long currentPosition; - try { - currentPosition = consumer.position(tp); - } catch (Exception e) { - // Fallback if the position cannot be fetched during a heavy breach state - currentPosition = -1; - } - - // ================================================================= - // TASK 3: ADMINISTRATIVE RESET DETECTION (Topic Deletion & Recreation) - // ================================================================= - // If the topic was reset, the log starts back at 0, but our - // tracking position is stranded in the future (past the new end offset). - if (beginningOffset == 0 && currentPosition > endOffset) { - log.warn("CRITICAL - Source topic reset detected for partition {}! (Current position: {}, Log End: {}). Automatically resubscribing from beginning offset (0).", - tp, currentPosition, endOffset); // Satisfies Task 3 logging requirements - - consumer.seek(tp, 0L); // Automatically aligns to offset 0 + /** + * Triage an OffsetOutOfRangeException raised by the source consumer. + * + *
    + *
  • Topic reset (source topic was dropped and recreated): the end offset has + * collapsed below our position and the beginning offset is back at 0. Re-seek + * to 0 so replication can resume against the rebuilt topic.
  • + *
  • Log truncation (records we still needed were purged by retention or + * admin delete-records): the beginning offset has moved past our position. There + * is no safe way to recover the missing range, so fail-fast with a + * {@link ConnectException}; vanilla MM2 would silently jump forward and create + * an undetectable gap on the target cluster.
  • + *
+ */ + void handleOutOfRangeOffsets(OffsetOutOfRangeException e) { + Map oorPositions = e.offsetOutOfRangePartitions(); + Set partitions = oorPositions.keySet(); + Map beginningOffsets = consumer.beginningOffsets(partitions); + Map endOffsets = consumer.endOffsets(partitions); + + for (TopicPartition tp : partitions) { + long position = oorPositions.get(tp); + long begin = beginningOffsets.getOrDefault(tp, 0L); + long end = endOffsets.getOrDefault(tp, 0L); + + if (position > end && begin == 0L) { + log.warn("Source topic reset detected at {} for partition {} " + + "(position={}, beginningOffset={}, endOffset={}). " + + "Re-seeking to offset 0 and resuming replication.", + Instant.now(), tp, position, begin, end); + consumer.seek(tp, 0L); continue; } - // ================================================================= - // TASK 2: LOG TRUNCATION DETECTION (Fail-Fast) - // ================================================================= - // If the log start offset has moved past 0 and our expected position - // falls behind it, data was purged by retention before we could replicate it. - if (beginningOffset > 0 && currentPosition < beginningOffset) { - log.error("FATAL - Source log truncation detected for partition {}! Expected position {} is behind source log start offset {}. Failing fast.", - tp, currentPosition, beginningOffset); // Satisfies Task 2 logging requirements - - // Throw exception immediately to crash the container for visibility - throw new KafkaException("Source log truncation detected for " + tp + ". Failing fast to prevent silent data loss."); + if (position < begin) { + String msg = String.format( + "Source log truncation detected for partition %s! " + + "Replication position %d is behind source log start offset %d. " + + "%d records have been irrecoverably lost from the source cluster.", + tp, position, begin, begin - position); + log.error(msg); + throw new ConnectException(msg); } + + log.error("Unexpected out-of-range offset for {}: position={}, beginningOffset={}, endOffset={}. Failing task.", + tp, position, begin, end); + throw new ConnectException("Unexpected out-of-range offset for " + tp); } } - */ @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { @@ -272,7 +269,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { offsetSyncWriter.firePendingOffsetSyncs(); } } - + private Map loadOffsets(Set topicPartitions) { return topicPartitions.stream().collect(Collectors.toMap(x -> x, this::loadOffset)); } @@ -283,20 +280,29 @@ private Long loadOffset(TopicPartition topicPartition) { return MirrorUtils.unwrapOffset(wrappedOffset); } + // visible for testing void initializeConsumer(Set taskTopicPartitions) { Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); - - // Use standard assign, no listener here consumer.assign(topicPartitionOffsets.keySet()); - + log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() + .filter(this::isUncommitted).count()); + topicPartitionOffsets.forEach((topicPartition, offset) -> { - if (!isUncommitted(offset)) { - consumer.seek(topicPartition, offset + 1L); + if (isUncommitted(offset)) { + // No committed offset yet. Seek explicitly to the beginning so we don't depend + // on consumer.auto.offset.reset, which we set to 'none' in mm2.properties so + // truncation surfaces as OffsetOutOfRangeException instead of silent recovery. + log.info("No committed offset for {}, seeking to beginning.", topicPartition); + consumer.seekToBeginning(Collections.singleton(topicPartition)); + return; } + long nextOffsetToCommittedOffset = offset + 1L; + log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); + consumer.seek(topicPartition, nextOffsetToCommittedOffset); }); } - // visible for testing + // visible for testing SourceRecord convertRecord(ConsumerRecord record) { String targetTopic = formatRemoteTopic(record.topic()); Headers headers = convertHeaders(record); @@ -332,20 +338,4 @@ private static int byteSize(byte[] bytes) { private boolean isUncommitted(Long offset) { return offset == null || offset < 0; } - - private void verifyPartitionState(TopicPartition tp, long nextOffset, long beginningOffset, long endOffset) { - // 1. True Log Truncation (Scenario 2: Data was chopped out from underneath MM2) - if (nextOffset < beginningOffset) { - log.error("CRITICAL: Source log truncation detected for {}! MM2 position is {}, but log starts at {}.", - tp, nextOffset, beginningOffset); - throw new org.apache.kafka.connect.errors.ConnectException("Fail-Fast: Hard log truncation detected."); - } - - // 2. True Topic Reset/Purge (Scenario 3: Topic was wiped clean, log reset back to 0) - if (beginningOffset == 0 && nextOffset > endOffset) { - log.warn("Detected intentional source topic purge/reset for {}. Re-aligning consumer position to 0L.", tp); - consumer.seekToBeginning(Collections.singleton(tp)); - } - } } - diff --git a/mm2.properties b/mm2.properties deleted file mode 100644 index b900ce1b449b0..0000000000000 --- a/mm2.properties +++ /dev/null @@ -1,27 +0,0 @@ -clusters=primary,standby - -primary.bootstrap.servers=primary:9092 -standby.bootstrap.servers=standby:9094 - -primary->standby.enabled=true -primary->standby.topics=commit-log - -primary->standby.emit.checkpoints.enabled=true -primary->standby.emit.checkpoints.interval.seconds=5 - -primary->standby.emit.heartbeats.enabled=true -primary->standby.emit.heartbeats.interval.seconds=5 - -primary->standby.sync.topic.acls.enabled=false -# Force the internal replication consumer to crash on out-of-range/truncation gaps -primary->standby.consumer.auto.offset.reset = none - -replication.factor=1 - -offset-syncs.topic.replication.factor=1 -checkpoints.topic.replication.factor=1 -heartbeats.topic.replication.factor=1 - -offset.storage.replication.factor=1 -status.storage.replication.factor=1 -config.storage.replication.factor=1 \ No newline at end of file