Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -35,7 +37,9 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -116,14 +120,14 @@ public void stop() {
try {
consumerAccess.acquire();
} catch (InterruptedException e) {
log.warn("Interrupted waiting for access to consumer. Will try closing anyway.");
log.warn("Interrupted waiting for access to consumer. Will try closing anyway.");
}
Utils.closeQuietly(consumer, "source consumer");
Utils.closeQuietly(offsetSyncWriter, "offset sync writer");
Utils.closeQuietly(legacyMetrics, "metrics");
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
}

@Override
public String version() {
return new MirrorSourceConnector().version();
Expand Down Expand Up @@ -164,6 +168,12 @@ public List<SourceRecord> poll() {
}
} catch (WakeupException e) {
return null;
} catch (OffsetOutOfRangeException e) {
// The source offset we're tracking is outside [beginningOffset, endOffset].
// This is either a truncation (data we needed was purged) or a topic reset
// (topic deleted+recreated). Handle them differently.
handleOutOfRangeOffsets(e);
return null;
} catch (KafkaException e) {
log.warn("Failure during poll.", e);
return null;
Expand All @@ -175,7 +185,57 @@ public List<SourceRecord> poll() {
consumerAccess.release();
}
}


/**
* Triage an OffsetOutOfRangeException raised by the source consumer.
*
* <ul>
* <li><b>Topic reset</b> (source topic was dropped and recreated): the end offset has
* collapsed below our position and the beginning offset is back at 0. Re-seek
* to 0 so replication can resume against the rebuilt topic.</li>
* <li><b>Log truncation</b> (records we still needed were purged by retention or
* admin delete-records): the beginning offset has moved past our position. There
* is no safe way to recover the missing range, so fail-fast with a
* {@link ConnectException}; vanilla MM2 would silently jump forward and create
* an undetectable gap on the target cluster.</li>
* </ul>
*/
void handleOutOfRangeOffsets(OffsetOutOfRangeException e) {
Map<TopicPartition, Long> oorPositions = e.offsetOutOfRangePartitions();
Set<TopicPartition> partitions = oorPositions.keySet();
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);

for (TopicPartition tp : partitions) {
long position = oorPositions.get(tp);
long begin = beginningOffsets.getOrDefault(tp, 0L);
long end = endOffsets.getOrDefault(tp, 0L);

if (position > end && begin == 0L) {
log.warn("Source topic reset detected at {} for partition {} "
+ "(position={}, beginningOffset={}, endOffset={}). "
+ "Re-seeking to offset 0 and resuming replication.",
Instant.now(), tp, position, begin, end);
consumer.seek(tp, 0L);
continue;
}

if (position < begin) {
String msg = String.format(
"Source log truncation detected for partition %s! "
+ "Replication position %d is behind source log start offset %d. "
+ "%d records have been irrecoverably lost from the source cluster.",
tp, position, begin, begin - position);
log.error(msg);
throw new ConnectException(msg);
}

log.error("Unexpected out-of-range offset for {}: position={}, beginningOffset={}, endOffset={}. Failing task.",
tp, position, begin, end);
throw new ConnectException("Unexpected out-of-range offset for " + tp);
}
}

@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
if (stopping) {
Expand Down Expand Up @@ -209,7 +269,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
offsetSyncWriter.firePendingOffsetSyncs();
}
}

private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
return topicPartitions.stream().collect(Collectors.toMap(x -> x, this::loadOffset));
}
Expand All @@ -228,9 +288,12 @@ void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
.filter(this::isUncommitted).count());

topicPartitionOffsets.forEach((topicPartition, offset) -> {
// Do not call seek on partitions that don't have an existing offset committed.
if (isUncommitted(offset)) {
log.trace("Skipping seeking offset for topicPartition: {}", topicPartition);
// No committed offset yet. Seek explicitly to the beginning so we don't depend
// on consumer.auto.offset.reset, which we set to 'none' in mm2.properties so
// truncation surfaces as OffsetOutOfRangeException instead of silent recovery.
log.info("No committed offset for {}, seeking to beginning.", topicPartition);
consumer.seekToBeginning(Collections.singleton(topicPartition));
return;
}
long nextOffsetToCommittedOffset = offset + 1L;
Expand All @@ -239,7 +302,7 @@ void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
});
}

// visible for testing
// visible for testing
SourceRecord convertRecord(ConsumerRecord<byte[], byte[]> record) {
String targetTopic = formatRemoteTopic(record.topic());
Headers headers = convertHeaders(record);
Expand Down
Loading