MINOR: MM2 surface log-truncation and topic-reset as actionable events in MirrorSourceTask#22380
Open
rajabhishekmaurya wants to merge 3 commits into
Open
MINOR: MM2 surface log-truncation and topic-reset as actionable events in MirrorSourceTask#22380rajabhishekmaurya wants to merge 3 commits into
rajabhishekmaurya wants to merge 3 commits into
Conversation
…ic reset handling - Enhanced MirrorSourceTask with pre-flight and runtime boundary checks via handleOffsetBreach - Added fatal exception throwing on log truncation to prevent silent data loss (Task 2) - Added automatic consumer realignment to offset 0 upon administrative topic reset detection (Task 3) - Created mm2.properties single-node cluster replication layout config
rajabhishekmaurya
added a commit
to rajabhishekmaurya/kafka-mirrormaker-challenge
that referenced
this pull request
May 27, 2026
rajabhishekmaurya
added a commit
to rajabhishekmaurya/kafka-mirrormaker-challenge
that referenced
this pull request
May 29, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
MirrorSourceTaskcurrently relies on the source consumer'sauto.offset.resetpolicy to recover from anyOffsetOutOfRangeException.In production that means the two most damaging real-world failure modes are
indistinguishable from a healthy stream:
replication position (retention policy,
kafka-delete-records.sh, etc.).With
auto.offset.reset=earliestthe consumer silently jumps forward; thetarget cluster ends up with a gap that no metric or log line reflects.
maintenance. The consumer's stored position is now beyond the new
endOffset, so it either throws repeatedly or, again withearliest,silently jumps forward.
This change makes
MirrorSourceTaskrecognise both situations and reactdifferently.
Changes
A new
catch (OffsetOutOfRangeException e)branch inpoll()dispatches to anew helper
handleOutOfRangeOffsets(...):position > endOffset && beginningOffset == 0for an out-of-rangepartition, the source topic looks newly recreated. The task logs a
WARNwith the timestamp and topic-partition, calls
consumer.seek(tp, 0L), andresumes replication on the next poll.
position < beginningOffset, data we still needed to replicate has beenpurged. The task logs an
ERRORdescribing the gap (positions and thenumber of records lost) and throws a
ConnectException, surfacing thefailure to Connect's task lifecycle (fail-fast) instead of papering over it.
task, so the implementation degrades safely rather than silently.
initializeConsumer(...)was also tightened: partitions with no committedoffset now
seekToBeginning(...)explicitly. This makes the task behavecorrectly when operators choose
consumer.auto.offset.reset = noneto optinto the truncation detection above (otherwise a first-time partition would
throw
NoOffsetForPartitionExceptionon first poll).Net change: one file, ~70 added / ~7 removed.
Why the predicate is safe
The
beginningOffset == 0condition in the reset branch is load-bearing:ordinary retention-driven trimming always leaves
beginningOffset > 0, soretention can never be mistaken for a topic reset — it always falls into the
truncation branch. Conversely, a recreated topic has both
beginningOffset == 0and anendOffsetlower than what we previouslyread, so the recreation case has a clean discriminator.
Testing
KRaft clusters + this build of MM2 + a small producer). Three scenarios are
run end-to-end:
of
primary.commit-logon the target cluster.kafka-delete-records.shto lift the low watermark past MM2's lastposition, unpause. Asserts that
"Source log truncation detected"appears in the task logs and that thetask transitions to FAILED.
fresh records, restart MM2. Asserts that
"Source topic reset detected"appears and that the new records arriveon the target.
apache/kafka:4.0.0.changed; the new exception branch is the only execution path that
behaviour-shifts and it only triggers on a state vanilla MM2 currently
silently absorbs).
Compatibility
OffsetOutOfRangeException, which only happens when the operator setsconsumer.auto.offset.reset = none. With the default(
auto.offset.reset = earliest),MirrorSourceTaskbehaves exactly asbefore.
Committer Checklist (excluded from commit message)