1919import org .apache .kafka .clients .consumer .ConsumerRecord ;
2020import org .apache .kafka .clients .consumer .ConsumerRecords ;
2121import org .apache .kafka .clients .consumer .KafkaConsumer ;
22+ import org .apache .kafka .clients .consumer .OffsetOutOfRangeException ;
2223import org .apache .kafka .clients .producer .RecordMetadata ;
2324import org .apache .kafka .common .KafkaException ;
2425import org .apache .kafka .common .TopicPartition ;
26+ import org .apache .kafka .common .errors .WakeupException ;
2527import org .apache .kafka .common .header .Header ;
2628import org .apache .kafka .common .utils .Utils ;
2729import org .apache .kafka .connect .data .Schema ;
30+ import org .apache .kafka .connect .errors .ConnectException ;
2831import org .apache .kafka .connect .header .ConnectHeaders ;
2932import org .apache .kafka .connect .header .Headers ;
3033import org .apache .kafka .connect .source .SourceRecord ;
3134import org .apache .kafka .connect .source .SourceTask ;
32- // import org.apache.kafka.common.errors.OffsetOutOfRangeException;
3335
3436import org .slf4j .Logger ;
3537import org .slf4j .LoggerFactory ;
3638
3739import java .time .Duration ;
40+ import java .time .Instant ;
3841import java .util .ArrayList ;
3942import java .util .Collections ;
4043import java .util .List ;
5053public class MirrorSourceTask extends SourceTask {
5154
5255 private static final Logger log = LoggerFactory .getLogger (MirrorSourceTask .class );
53- private final java .util .Map <TopicPartition , Long > lastExpectedOffsets = new java .util .HashMap <>();
5456
5557 private KafkaConsumer <byte [], byte []> consumer ;
5658 private String sourceClusterAlias ;
@@ -118,126 +120,121 @@ public void stop() {
118120 try {
119121 consumerAccess .acquire ();
120122 } catch (InterruptedException e ) {
121- log .warn ("Interrupted waiting for access to consumer. Will try closing anyway." );
123+ log .warn ("Interrupted waiting for access to consumer. Will try closing anyway." );
122124 }
123125 Utils .closeQuietly (consumer , "source consumer" );
124126 Utils .closeQuietly (offsetSyncWriter , "offset sync writer" );
125127 Utils .closeQuietly (legacyMetrics , "metrics" );
126128 log .info ("Stopping {} took {} ms." , Thread .currentThread ().getName (), System .currentTimeMillis () - start );
127129 }
128-
130+
129131 @ Override
130132 public String version () {
131133 return new MirrorSourceConnector ().version ();
132134 }
133135
134136 @ Override
135137 public List <SourceRecord > poll () {
136- if (!consumerAccess .tryAcquire ()) return null ;
137-
138+ if (!consumerAccess .tryAcquire ()) {
139+ return null ;
140+ }
138141 if (stopping ) {
139- consumerAccess .release ();
140142 return null ;
141143 }
142-
143144 try {
144145 ConsumerRecords <byte [], byte []> records = consumer .poll (pollTimeout );
145- // Validate partitions AFTER poll
146- for (TopicPartition tp : consumer .assignment ()) {
147- long nextOffset = consumer .position (tp );
148- Map <TopicPartition , Long > beginningOffsets =
149- consumer .beginningOffsets (Collections .singleton (tp ));
150-
151- Map <TopicPartition , Long > endOffsets =
152- consumer .endOffsets (Collections .singleton (tp ));
153-
154- long beginningOffset = beginningOffsets .get (tp );
155- long endOffset = endOffsets .get (tp );
156-
157- verifyPartitionState (
158- tp ,
159- nextOffset ,
160- beginningOffset ,
161- endOffset
162- );
163- }
164-
165146 List <SourceRecord > sourceRecords = new ArrayList <>(records .count ());
166147 for (ConsumerRecord <byte [], byte []> record : records ) {
167- sourceRecords .add (convertRecord (record ));
148+ SourceRecord converted = convertRecord (record );
149+ sourceRecords .add (converted );
150+ TopicPartition topicPartition = new TopicPartition (converted .topic (), converted .kafkaPartition ());
151+ long age = System .currentTimeMillis () - record .timestamp ();
152+ long size = byteSize (record .value ());
153+ if (legacyMetrics != null ) {
154+ legacyMetrics .recordAge (topicPartition , age );
155+ legacyMetrics .recordBytes (topicPartition , size );
156+ }
157+ if (metrics != null ) {
158+ metrics .recordAge (topicPartition , age );
159+ metrics .recordBytes (topicPartition , size );
160+ }
168161 }
169- return sourceRecords .isEmpty () ? null : sourceRecords ;
170-
171- } catch (org .apache .kafka .common .errors .OffsetOutOfRangeException e ) {
172- log .error ("Source log truncation detected" , e );
173- throw new org .apache .kafka .connect .errors .ConnectException (
174- "Fail-Fast: Source log truncation detected." ,
175- e
176- );
177-
162+ if (sourceRecords .isEmpty ()) {
163+ // WorkerSourceTasks expects non-zero batch size
164+ return null ;
165+ } else {
166+ log .trace ("Polled {} records from {}." , sourceRecords .size (), records .partitions ());
167+ return sourceRecords ;
168+ }
169+ } catch (WakeupException e ) {
170+ return null ;
171+ } catch (OffsetOutOfRangeException e ) {
172+ // The source offset we're tracking is outside [beginningOffset, endOffset].
173+ // This is either a truncation (data we needed was purged) or a topic reset
174+ // (topic deleted+recreated). Handle them differently.
175+ handleOutOfRangeOffsets (e );
176+ return null ;
177+ } catch (KafkaException e ) {
178+ log .warn ("Failure during poll." , e );
179+ return null ;
180+ } catch (Throwable e ) {
181+ log .error ("Failure during poll." , e );
182+ // allow Connect to deal with the exception
183+ throw e ;
178184 } finally {
179185 consumerAccess .release ();
180186 }
181187 }
182188
183- // // Helper to keep poll() clean
184- // private List<SourceRecord> processRecords(ConsumerRecords<byte[], byte[]> records) {
185- // List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
186- // for (ConsumerRecord<byte[], byte[]> record : records) {
187- // sourceRecords.add(convertRecord(record));
188- // }
189- // return sourceRecords.isEmpty() ? null : sourceRecords;
190- // }
191-
192- /*
193- private void handleOffsetBreach(Set<TopicPartition> breachedPartitions) {
194- if (breachedPartitions == null || breachedPartitions.isEmpty()) return;
195-
196- // Query the cluster for the current log boundaries of the affected partitions
197- Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(breachedPartitions);
198- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(breachedPartitions);
199-
200- for (TopicPartition tp : breachedPartitions) {
201- long beginningOffset = beginningOffsets.getOrDefault(tp, 0L);
202- long endOffset = endOffsets.getOrDefault(tp, 0L);
203-
204- // Look up where our consumer was expecting to read from
205- long currentPosition;
206- try {
207- currentPosition = consumer.position(tp);
208- } catch (Exception e) {
209- // Fallback if the position cannot be fetched during a heavy breach state
210- currentPosition = -1;
211- }
212-
213- // =================================================================
214- // TASK 3: ADMINISTRATIVE RESET DETECTION (Topic Deletion & Recreation)
215- // =================================================================
216- // If the topic was reset, the log starts back at 0, but our
217- // tracking position is stranded in the future (past the new end offset).
218- if (beginningOffset == 0 && currentPosition > endOffset) {
219- log.warn("CRITICAL - Source topic reset detected for partition {}! (Current position: {}, Log End: {}). Automatically resubscribing from beginning offset (0).",
220- tp, currentPosition, endOffset); // Satisfies Task 3 logging requirements
221-
222- consumer.seek(tp, 0L); // Automatically aligns to offset 0
189+ /**
190+ * Triage an OffsetOutOfRangeException raised by the source consumer.
191+ *
192+ * <ul>
193+ * <li><b>Topic reset</b> (source topic was dropped and recreated): the end offset has
194+ * collapsed below our position and the beginning offset is back at 0. Re-seek
195+ * to 0 so replication can resume against the rebuilt topic.</li>
196+ * <li><b>Log truncation</b> (records we still needed were purged by retention or
197+ * admin delete-records): the beginning offset has moved past our position. There
198+ * is no safe way to recover the missing range, so fail-fast with a
199+ * {@link ConnectException}; vanilla MM2 would silently jump forward and create
200+ * an undetectable gap on the target cluster.</li>
201+ * </ul>
202+ */
203+ void handleOutOfRangeOffsets (OffsetOutOfRangeException e ) {
204+ Map <TopicPartition , Long > oorPositions = e .offsetOutOfRangePartitions ();
205+ Set <TopicPartition > partitions = oorPositions .keySet ();
206+ Map <TopicPartition , Long > beginningOffsets = consumer .beginningOffsets (partitions );
207+ Map <TopicPartition , Long > endOffsets = consumer .endOffsets (partitions );
208+
209+ for (TopicPartition tp : partitions ) {
210+ long position = oorPositions .get (tp );
211+ long begin = beginningOffsets .getOrDefault (tp , 0L );
212+ long end = endOffsets .getOrDefault (tp , 0L );
213+
214+ if (position > end && begin == 0L ) {
215+ log .warn ("Source topic reset detected at {} for partition {} "
216+ + "(position={}, beginningOffset={}, endOffset={}). "
217+ + "Re-seeking to offset 0 and resuming replication." ,
218+ Instant .now (), tp , position , begin , end );
219+ consumer .seek (tp , 0L );
223220 continue ;
224221 }
225222
226- // =================================================================
227- // TASK 2: LOG TRUNCATION DETECTION (Fail-Fast)
228- // =================================================================
229- // If the log start offset has moved past 0 and our expected position
230- // falls behind it, data was purged by retention before we could replicate it.
231- if (beginningOffset > 0 && currentPosition < beginningOffset) {
232- log.error("FATAL - Source log truncation detected for partition {}! Expected position {} is behind source log start offset {}. Failing fast.",
233- tp, currentPosition, beginningOffset); // Satisfies Task 2 logging requirements
234-
235- // Throw exception immediately to crash the container for visibility
236- throw new KafkaException("Source log truncation detected for " + tp + ". Failing fast to prevent silent data loss.");
223+ if (position < begin ) {
224+ String msg = String .format (
225+ "Source log truncation detected for partition %s! "
226+ + "Replication position %d is behind source log start offset %d. "
227+ + "%d records have been irrecoverably lost from the source cluster." ,
228+ tp , position , begin , begin - position );
229+ log .error (msg );
230+ throw new ConnectException (msg );
237231 }
232+
233+ log .error ("Unexpected out-of-range offset for {}: position={}, beginningOffset={}, endOffset={}. Failing task." ,
234+ tp , position , begin , end );
235+ throw new ConnectException ("Unexpected out-of-range offset for " + tp );
238236 }
239237 }
240- */
241238
242239 @ Override
243240 public void commitRecord (SourceRecord record , RecordMetadata metadata ) {
@@ -272,7 +269,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {
272269 offsetSyncWriter .firePendingOffsetSyncs ();
273270 }
274271 }
275-
272+
276273 private Map <TopicPartition , Long > loadOffsets (Set <TopicPartition > topicPartitions ) {
277274 return topicPartitions .stream ().collect (Collectors .toMap (x -> x , this ::loadOffset ));
278275 }
@@ -283,20 +280,29 @@ private Long loadOffset(TopicPartition topicPartition) {
283280 return MirrorUtils .unwrapOffset (wrappedOffset );
284281 }
285282
283+ // visible for testing
286284 void initializeConsumer (Set <TopicPartition > taskTopicPartitions ) {
287285 Map <TopicPartition , Long > topicPartitionOffsets = loadOffsets (taskTopicPartitions );
288-
289- // Use standard assign, no listener here
290286 consumer .assign (topicPartitionOffsets .keySet ());
291-
287+ log .info ("Starting with {} previously uncommitted partitions." , topicPartitionOffsets .values ().stream ()
288+ .filter (this ::isUncommitted ).count ());
289+
292290 topicPartitionOffsets .forEach ((topicPartition , offset ) -> {
293- if (!isUncommitted (offset )) {
294- consumer .seek (topicPartition , offset + 1L );
291+ if (isUncommitted (offset )) {
292+ // No committed offset yet. Seek explicitly to the beginning so we don't depend
293+ // on consumer.auto.offset.reset, which we set to 'none' in mm2.properties so
294+ // truncation surfaces as OffsetOutOfRangeException instead of silent recovery.
295+ log .info ("No committed offset for {}, seeking to beginning." , topicPartition );
296+ consumer .seekToBeginning (Collections .singleton (topicPartition ));
297+ return ;
295298 }
299+ long nextOffsetToCommittedOffset = offset + 1L ;
300+ log .trace ("Seeking to offset {} for topicPartition: {}" , nextOffsetToCommittedOffset , topicPartition );
301+ consumer .seek (topicPartition , nextOffsetToCommittedOffset );
296302 });
297303 }
298304
299- // visible for testing
305+ // visible for testing
300306 SourceRecord convertRecord (ConsumerRecord <byte [], byte []> record ) {
301307 String targetTopic = formatRemoteTopic (record .topic ());
302308 Headers headers = convertHeaders (record );
@@ -332,20 +338,4 @@ private static int byteSize(byte[] bytes) {
332338 private boolean isUncommitted (Long offset ) {
333339 return offset == null || offset < 0 ;
334340 }
335-
336- private void verifyPartitionState (TopicPartition tp , long nextOffset , long beginningOffset , long endOffset ) {
337- // 1. True Log Truncation (Scenario 2: Data was chopped out from underneath MM2)
338- if (nextOffset < beginningOffset ) {
339- log .error ("CRITICAL: Source log truncation detected for {}! MM2 position is {}, but log starts at {}." ,
340- tp , nextOffset , beginningOffset );
341- throw new org .apache .kafka .connect .errors .ConnectException ("Fail-Fast: Hard log truncation detected." );
342- }
343-
344- // 2. True Topic Reset/Purge (Scenario 3: Topic was wiped clean, log reset back to 0)
345- if (beginningOffset == 0 && nextOffset > endOffset ) {
346- log .warn ("Detected intentional source topic purge/reset for {}. Re-aligning consumer position to 0L." , tp );
347- consumer .seekToBeginning (Collections .singleton (tp ));
348- }
349- }
350341}
351-
0 commit comments