diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index cd6b9b01eedf3..cc0ebef8d98dc 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -26,6 +27,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; @@ -35,7 +37,9 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -116,14 +120,14 @@ public void stop() { try { consumerAccess.acquire(); } catch (InterruptedException e) { - log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); + log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); } Utils.closeQuietly(consumer, "source consumer"); Utils.closeQuietly(offsetSyncWriter, "offset sync writer"); Utils.closeQuietly(legacyMetrics, "metrics"); log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start); } - + @Override public String version() { return new MirrorSourceConnector().version(); @@ -164,6 +168,12 @@ public List poll() { } } catch (WakeupException e) { return null; + } catch (OffsetOutOfRangeException e) { + // The source offset we're tracking is outside [beginningOffset, endOffset]. + // This is either a truncation (data we needed was purged) or a topic reset + // (topic deleted+recreated). Handle them differently. + handleOutOfRangeOffsets(e); + return null; } catch (KafkaException e) { log.warn("Failure during poll.", e); return null; @@ -175,7 +185,57 @@ public List poll() { consumerAccess.release(); } } - + + /** + * Triage an OffsetOutOfRangeException raised by the source consumer. + * + * + */ + void handleOutOfRangeOffsets(OffsetOutOfRangeException e) { + Map oorPositions = e.offsetOutOfRangePartitions(); + Set partitions = oorPositions.keySet(); + Map beginningOffsets = consumer.beginningOffsets(partitions); + Map endOffsets = consumer.endOffsets(partitions); + + for (TopicPartition tp : partitions) { + long position = oorPositions.get(tp); + long begin = beginningOffsets.getOrDefault(tp, 0L); + long end = endOffsets.getOrDefault(tp, 0L); + + if (position > end && begin == 0L) { + log.warn("Source topic reset detected at {} for partition {} " + + "(position={}, beginningOffset={}, endOffset={}). " + + "Re-seeking to offset 0 and resuming replication.", + Instant.now(), tp, position, begin, end); + consumer.seek(tp, 0L); + continue; + } + + if (position < begin) { + String msg = String.format( + "Source log truncation detected for partition %s! " + + "Replication position %d is behind source log start offset %d. " + + "%d records have been irrecoverably lost from the source cluster.", + tp, position, begin, begin - position); + log.error(msg); + throw new ConnectException(msg); + } + + log.error("Unexpected out-of-range offset for {}: position={}, beginningOffset={}, endOffset={}. Failing task.", + tp, position, begin, end); + throw new ConnectException("Unexpected out-of-range offset for " + tp); + } + } + @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { if (stopping) { @@ -209,7 +269,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { offsetSyncWriter.firePendingOffsetSyncs(); } } - + private Map loadOffsets(Set topicPartitions) { return topicPartitions.stream().collect(Collectors.toMap(x -> x, this::loadOffset)); } @@ -228,9 +288,12 @@ void initializeConsumer(Set taskTopicPartitions) { .filter(this::isUncommitted).count()); topicPartitionOffsets.forEach((topicPartition, offset) -> { - // Do not call seek on partitions that don't have an existing offset committed. if (isUncommitted(offset)) { - log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); + // No committed offset yet. Seek explicitly to the beginning so we don't depend + // on consumer.auto.offset.reset, which we set to 'none' in mm2.properties so + // truncation surfaces as OffsetOutOfRangeException instead of silent recovery. + log.info("No committed offset for {}, seeking to beginning.", topicPartition); + consumer.seekToBeginning(Collections.singleton(topicPartition)); return; } long nextOffsetToCommittedOffset = offset + 1L; @@ -239,7 +302,7 @@ void initializeConsumer(Set taskTopicPartitions) { }); } - // visible for testing + // visible for testing SourceRecord convertRecord(ConsumerRecord record) { String targetTopic = formatRemoteTopic(record.topic()); Headers headers = convertHeaders(record);