Skip to content

Commit ab7148e

Browse files
code refactor
1 parent 1e5295a commit ab7148e

2 files changed

Lines changed: 69 additions & 38 deletions

File tree

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@
2929
import org.apache.kafka.connect.header.Headers;
3030
import org.apache.kafka.connect.source.SourceRecord;
3131
import org.apache.kafka.connect.source.SourceTask;
32-
import org.apache.kafka.common.errors.OffsetOutOfRangeException; // Add this
32+
// import org.apache.kafka.common.errors.OffsetOutOfRangeException;
3333

3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

3737
import java.time.Duration;
3838
import java.util.ArrayList;
39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Set;
@@ -49,6 +50,7 @@
4950
public class MirrorSourceTask extends SourceTask {
5051

5152
private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
53+
private final java.util.Map<TopicPartition, Long> lastExpectedOffsets = new java.util.HashMap<>();
5254

5355
private KafkaConsumer<byte[], byte[]> consumer;
5456
private String sourceClusterAlias;
@@ -131,47 +133,63 @@ public String version() {
131133

132134
@Override
133135
public List<SourceRecord> poll() {
134-
if (!consumerAccess.tryAcquire()) {
135-
return null;
136-
}
136+
if (!consumerAccess.tryAcquire()) return null;
137+
137138
if (stopping) {
138139
consumerAccess.release();
139140
return null;
140141
}
142+
141143
try {
142-
// REMOVED: validateSourceTopicState() from here to save network overhead
143-
144144
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
145+
// Validate partitions AFTER poll
146+
for (TopicPartition tp : consumer.assignment()) {
147+
long nextOffset = consumer.position(tp);
148+
Map<TopicPartition, Long> beginningOffsets =
149+
consumer.beginningOffsets(Collections.singleton(tp));
150+
151+
Map<TopicPartition, Long> endOffsets =
152+
consumer.endOffsets(Collections.singleton(tp));
153+
154+
long beginningOffset = beginningOffsets.get(tp);
155+
long endOffset = endOffsets.get(tp);
156+
157+
verifyPartitionState(
158+
tp,
159+
nextOffset,
160+
beginningOffset,
161+
endOffset
162+
);
163+
}
145164

146165
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
147166
for (ConsumerRecord<byte[], byte[]> record : records) {
148-
SourceRecord converted = convertRecord(record);
149-
sourceRecords.add(converted);
150-
TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition());
151-
metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp());
152-
metrics.recordBytes(topicPartition, byteSize(record.value()));
167+
sourceRecords.add(convertRecord(record));
153168
}
154-
if (sourceRecords.isEmpty()) {
155-
return null;
156-
} else {
157-
return sourceRecords;
158-
}
159-
} catch (org.apache.kafka.common.errors.WakeupException e) {
160-
return null;
161-
} catch (OffsetOutOfRangeException e) {
162-
// =================================================================
163-
// RECOVERY & FAIL-FAST ROUTER ON EXCEPTION
164-
// =================================================================
165-
log.warn("Consumer offset out of bounds. Evaluating cluster state to differentiate truncation vs reset...");
166-
handleOffsetBreach(consumer.assignment());
167-
return null;
168-
} catch (KafkaException e) {
169-
throw e;
169+
return sourceRecords.isEmpty() ? null : sourceRecords;
170+
171+
} catch (org.apache.kafka.common.errors.OffsetOutOfRangeException e) {
172+
log.error("Source log truncation detected", e);
173+
throw new org.apache.kafka.connect.errors.ConnectException(
174+
"Fail-Fast: Source log truncation detected.",
175+
e
176+
);
177+
170178
} finally {
171179
consumerAccess.release();
172180
}
173181
}
174182

183+
// // Helper to keep poll() clean
184+
// private List<SourceRecord> processRecords(ConsumerRecords<byte[], byte[]> records) {
185+
// List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
186+
// for (ConsumerRecord<byte[], byte[]> record : records) {
187+
// sourceRecords.add(convertRecord(record));
188+
// }
189+
// return sourceRecords.isEmpty() ? null : sourceRecords;
190+
// }
191+
192+
/*
175193
private void handleOffsetBreach(Set<TopicPartition> breachedPartitions) {
176194
if (breachedPartitions == null || breachedPartitions.isEmpty()) return;
177195
@@ -219,6 +237,7 @@ private void handleOffsetBreach(Set<TopicPartition> breachedPartitions) {
219237
}
220238
}
221239
}
240+
*/
222241

223242
@Override
224243
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
@@ -264,22 +283,16 @@ private Long loadOffset(TopicPartition topicPartition) {
264283
return MirrorUtils.unwrapOffset(wrappedOffset);
265284
}
266285

267-
// visible for testing
268286
void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
269287
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
288+
289+
// Use standard assign, no listener here
270290
consumer.assign(topicPartitionOffsets.keySet());
271-
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream()
272-
.filter(this::isUncommitted).count());
273-
291+
274292
topicPartitionOffsets.forEach((topicPartition, offset) -> {
275-
// Do not call seek on partitions that don't have an existing offset committed.
276-
if (isUncommitted(offset)) {
277-
log.trace("Skipping seeking offset for topicPartition: {}", topicPartition);
278-
return;
293+
if (!isUncommitted(offset)) {
294+
consumer.seek(topicPartition, offset + 1L);
279295
}
280-
long nextOffsetToCommittedOffset = offset + 1L;
281-
log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition);
282-
consumer.seek(topicPartition, nextOffsetToCommittedOffset);
283296
});
284297
}
285298

@@ -319,4 +332,20 @@ private static int byteSize(byte[] bytes) {
319332
private boolean isUncommitted(Long offset) {
320333
return offset == null || offset < 0;
321334
}
335+
336+
private void verifyPartitionState(TopicPartition tp, long nextOffset, long beginningOffset, long endOffset) {
337+
// 1. True Log Truncation (Scenario 2: Data was chopped out from underneath MM2)
338+
if (nextOffset < beginningOffset) {
339+
log.error("CRITICAL: Source log truncation detected for {}! MM2 position is {}, but log starts at {}.",
340+
tp, nextOffset, beginningOffset);
341+
throw new org.apache.kafka.connect.errors.ConnectException("Fail-Fast: Hard log truncation detected.");
342+
}
343+
344+
// 2. True Topic Reset/Purge (Scenario 3: Topic was wiped clean, log reset back to 0)
345+
if (beginningOffset == 0 && nextOffset > endOffset) {
346+
log.warn("Detected intentional source topic purge/reset for {}. Re-aligning consumer position to 0L.", tp);
347+
consumer.seekToBeginning(Collections.singleton(tp));
348+
}
349+
}
322350
}
351+

mm2.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ primary->standby.emit.heartbeats.enabled=true
1313
primary->standby.emit.heartbeats.interval.seconds=5
1414

1515
primary->standby.sync.topic.acls.enabled=false
16+
# Force the internal replication consumer to crash on out-of-range/truncation gaps
17+
primary->standby.consumer.auto.offset.reset = none
1618

1719
replication.factor=1
1820

0 commit comments

Comments
 (0)