@@ -44,13 +44,15 @@ public long writerId() {
4444 return writerId ;
4545 }
4646
47- public void append (LogRecordBatch batch , boolean isWriterInBatchExpired ) {
47+ public void append (
48+ LogRecordBatch batch , boolean isWriterInBatchExpired , boolean isAppendAsLeader ) {
4849 LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata (batch .baseLogOffset ());
4950 appendDataBatch (
5051 batch .batchSequence (),
5152 firstOffsetMetadata ,
5253 batch .lastLogOffset (),
5354 isWriterInBatchExpired ,
55+ isAppendAsLeader ,
5456 batch .commitTimestamp ());
5557 }
5658
@@ -59,8 +61,9 @@ public void appendDataBatch(
5961 LogOffsetMetadata firstOffsetMetadata ,
6062 long lastOffset ,
6163 boolean isWriterInBatchExpired ,
64+ boolean isAppendAsLeader ,
6265 long batchTimestamp ) {
63- maybeValidateDataBatch (batchSequence , isWriterInBatchExpired , lastOffset );
66+ maybeValidateDataBatch (batchSequence , isWriterInBatchExpired , lastOffset , isAppendAsLeader );
6467 updatedEntry .addBath (
6568 batchSequence ,
6669 lastOffset ,
@@ -69,13 +72,16 @@ public void appendDataBatch(
6972 }
7073
7174 private void maybeValidateDataBatch (
72- int appendFirstSeq , boolean isWriterInBatchExpired , long lastOffset ) {
75+ int appendFirstSeq ,
76+ boolean isWriterInBatchExpired ,
77+ long lastOffset ,
78+ boolean isAppendAsLeader ) {
7379 int currentLastSeq =
7480 !updatedEntry .isEmpty ()
7581 ? updatedEntry .lastBatchSequence ()
7682 : currentEntry .lastBatchSequence ();
7783 // must be in sequence, even for the first batch should start from 0
78- if (!inSequence (currentLastSeq , appendFirstSeq , isWriterInBatchExpired )) {
84+ if (!inSequence (currentLastSeq , appendFirstSeq , isWriterInBatchExpired , isAppendAsLeader )) {
7985 throw new OutOfOrderSequenceException (
8086 String .format (
8187 "Out of order batch sequence for writer %s at offset %s in "
@@ -93,16 +99,55 @@ public WriterStateEntry toEntry() {
9399 * three scenarios will be judged as in sequence:
94100 *
95101 * <ul>
96- * <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check whether the committed
97- * timestamp of the next batch under the current writerId has expired. If it has expired,
98- * we consider this a special case caused by writerId expiration, for this case, to ensure
99- * the correctness of follower sync, we still treat it as in sequence.
102+ * <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two scenarios will be judged as
103+ * in sequence:
104+ * <ul>
105+ * <li>If the committed timestamp of the next batch under the current writerId has
106+ * expired, we consider this a special case caused by writerId expiration, for this
107+ * case, to ensure the correctness of follower sync, we still treat it as in
108+ * sequence.
109+ * <li>If the append request is from the follower, we consider this is a special case
110+ * caused by inconsistent expiration of writerId between the leader and follower. To
111+ * prevent continuous fetch failures on the follower side, we still treat it as in
112+ * sequence.
113+ * <p>Here is a detailed example illustrating the scenario described above: The
114+ * expiration of a writer is triggered asynchronously by the {@code
115+ * PeriodicWriterIdExpirationCheck} thread at intervals defined by {@link
116+ * org.apache.fluss.config.ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL}, which
117+ * can result in slight differences in the actual expiration times of the same
118+ * writer on the leader replica and follower replicas.
119+ * <p>This slight difference leads to a dreadful corner case. Imagine the following
120+ * scenario(set {@link
121+ * org.apache.fluss.config.ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL}:
122+ * 10min, {@link org.apache.fluss.config.ConfigOptions.WRITER_ID_EXPIRATION_TIME}:
123+ * 12h):
124+ * <pre>
125+ * Step Time Action of Leader Action of Follower
126+ * 1 2025-08-28 00:03:38 receive batch 0 of writer 101
127+ * 2 2025-08-28 00:03:38 fetch batch 0 of writer 101
128+ * 3 2025-08-28 12:05:00 remove state of writer 101
129+ * 4 2025-08-28 12:10:02 receive batch 1 of writer 101
130+ * 5 2025-08-28 12:10:02 fetch batch 0 of writer 101
131+ * 6 2025-08-28 12:11:00 remove state of writer 101
132+ * </pre>
133+ * In step 3, the follower removes the state of writer 101 first, since it has been
134+ * more than 12 hours since writer 101's last batch write, making it safe to remove.
135+ * However, since the expiration of writer 101 has not yet occurred on the leader,
136+ * and a new batch 1 is received at this time, it is successfully written on the
137+ * leader. At this point, the fetcher pulls batch 1 from the leader, but since the
138+ * state of writer 101 has already been cleaned up, an {@link
139+ * OutOfOrderSequenceException} will occur during to write.
140+ * </ul>
100141 * <li>nextBatchSeq == lastBatchSeq + 1L
101142 * <li>lastBatchSeq reaches its maximum value
102143 * </ul>
103144 */
104- private boolean inSequence (int lastBatchSeq , int nextBatchSeq , boolean isWriterInBatchExpired ) {
105- return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired )
145+ private boolean inSequence (
146+ int lastBatchSeq ,
147+ int nextBatchSeq ,
148+ boolean isWriterInBatchExpired ,
149+ boolean isAppendAsLeader ) {
150+ return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired || !isAppendAsLeader ))
106151 || nextBatchSeq == lastBatchSeq + 1L
107152 || (nextBatchSeq == 0 && lastBatchSeq == Integer .MAX_VALUE );
108153 }
0 commit comments