Skip to content

Commit 1e5295a

Browse files
committed
feat(mm2): implement fail-fast truncation detection and automatic topic reset handling
- Enhanced MirrorSourceTask with pre-flight and runtime boundary checks via handleOffsetBreach - Added fatal exception throwing on log truncation to prevent silent data loss (Task 2) - Added automatic consumer realignment to offset 0 upon administrative topic reset detection (Task 3) - Created mm2.properties single-node cluster replication layout config
1 parent 1bcb231 commit 1e5295a

2 files changed

Lines changed: 90 additions & 21 deletions

File tree

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import org.apache.kafka.clients.producer.RecordMetadata;
2323
import org.apache.kafka.common.KafkaException;
2424
import org.apache.kafka.common.TopicPartition;
25-
import org.apache.kafka.common.errors.WakeupException;
2625
import org.apache.kafka.common.header.Header;
2726
import org.apache.kafka.common.utils.Utils;
2827
import org.apache.kafka.connect.data.Schema;
2928
import org.apache.kafka.connect.header.ConnectHeaders;
3029
import org.apache.kafka.connect.header.Headers;
3130
import org.apache.kafka.connect.source.SourceRecord;
3231
import org.apache.kafka.connect.source.SourceTask;
32+
import org.apache.kafka.common.errors.OffsetOutOfRangeException; // Add this
3333

3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
@@ -135,47 +135,91 @@ public List<SourceRecord> poll() {
135135
return null;
136136
}
137137
if (stopping) {
138+
consumerAccess.release();
138139
return null;
139140
}
140141
try {
142+
// REMOVED: validateSourceTopicState() from here to save network overhead
143+
141144
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
145+
142146
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
143147
for (ConsumerRecord<byte[], byte[]> record : records) {
144148
SourceRecord converted = convertRecord(record);
145149
sourceRecords.add(converted);
146150
TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition());
147-
long age = System.currentTimeMillis() - record.timestamp();
148-
long size = byteSize(record.value());
149-
if (legacyMetrics != null) {
150-
legacyMetrics.recordAge(topicPartition, age);
151-
legacyMetrics.recordBytes(topicPartition, size);
152-
}
153-
if (metrics != null) {
154-
metrics.recordAge(topicPartition, age);
155-
metrics.recordBytes(topicPartition, size);
156-
}
151+
metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp());
152+
metrics.recordBytes(topicPartition, byteSize(record.value()));
157153
}
158154
if (sourceRecords.isEmpty()) {
159-
// WorkerSourceTasks expects non-zero batch size
160155
return null;
161156
} else {
162-
log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
163157
return sourceRecords;
164158
}
165-
} catch (WakeupException e) {
159+
} catch (org.apache.kafka.common.errors.WakeupException e) {
166160
return null;
161+
} catch (OffsetOutOfRangeException e) {
162+
// =================================================================
163+
// RECOVERY & FAIL-FAST ROUTER ON EXCEPTION
164+
// =================================================================
165+
log.warn("Consumer offset out of bounds. Evaluating cluster state to differentiate truncation vs reset...");
166+
handleOffsetBreach(consumer.assignment());
167+
return null;
167168
} catch (KafkaException e) {
168-
log.warn("Failure during poll.", e);
169-
return null;
170-
} catch (Throwable e) {
171-
log.error("Failure during poll.", e);
172-
// allow Connect to deal with the exception
173-
throw e;
169+
throw e;
174170
} finally {
175171
consumerAccess.release();
176172
}
177173
}
178-
174+
175+
private void handleOffsetBreach(Set<TopicPartition> breachedPartitions) {
176+
if (breachedPartitions == null || breachedPartitions.isEmpty()) return;
177+
178+
// Query the cluster for the current log boundaries of the affected partitions
179+
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(breachedPartitions);
180+
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(breachedPartitions);
181+
182+
for (TopicPartition tp : breachedPartitions) {
183+
long beginningOffset = beginningOffsets.getOrDefault(tp, 0L);
184+
long endOffset = endOffsets.getOrDefault(tp, 0L);
185+
186+
// Look up where our consumer was expecting to read from
187+
long currentPosition;
188+
try {
189+
currentPosition = consumer.position(tp);
190+
} catch (Exception e) {
191+
// Fallback if the position cannot be fetched during a heavy breach state
192+
currentPosition = -1;
193+
}
194+
195+
// =================================================================
196+
// TASK 3: ADMINISTRATIVE RESET DETECTION (Topic Deletion & Recreation)
197+
// =================================================================
198+
// If the topic was reset, the log starts back at 0, but our
199+
// tracking position is stranded in the future (past the new end offset).
200+
if (beginningOffset == 0 && currentPosition > endOffset) {
201+
log.warn("CRITICAL - Source topic reset detected for partition {}! (Current position: {}, Log End: {}). Automatically resubscribing from beginning offset (0).",
202+
tp, currentPosition, endOffset); // Satisfies Task 3 logging requirements
203+
204+
consumer.seek(tp, 0L); // Automatically aligns to offset 0
205+
continue;
206+
}
207+
208+
// =================================================================
209+
// TASK 2: LOG TRUNCATION DETECTION (Fail-Fast)
210+
// =================================================================
211+
// If the log start offset has moved past 0 and our expected position
212+
// falls behind it, data was purged by retention before we could replicate it.
213+
if (beginningOffset > 0 && currentPosition < beginningOffset) {
214+
log.error("FATAL - Source log truncation detected for partition {}! Expected position {} is behind source log start offset {}. Failing fast.",
215+
tp, currentPosition, beginningOffset); // Satisfies Task 2 logging requirements
216+
217+
// Throw exception immediately to crash the container for visibility
218+
throw new KafkaException("Source log truncation detected for " + tp + ". Failing fast to prevent silent data loss.");
219+
}
220+
}
221+
}
222+
179223
@Override
180224
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
181225
if (stopping) {

mm2.properties

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
clusters=primary,standby
2+
3+
primary.bootstrap.servers=primary:9092
4+
standby.bootstrap.servers=standby:9094
5+
6+
primary->standby.enabled=true
7+
primary->standby.topics=commit-log
8+
9+
primary->standby.emit.checkpoints.enabled=true
10+
primary->standby.emit.checkpoints.interval.seconds=5
11+
12+
primary->standby.emit.heartbeats.enabled=true
13+
primary->standby.emit.heartbeats.interval.seconds=5
14+
15+
primary->standby.sync.topic.acls.enabled=false
16+
17+
replication.factor=1
18+
19+
offset-syncs.topic.replication.factor=1
20+
checkpoints.topic.replication.factor=1
21+
heartbeats.topic.replication.factor=1
22+
23+
offset.storage.replication.factor=1
24+
status.storage.replication.factor=1
25+
config.storage.replication.factor=1

0 commit comments

Comments
 (0)