Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b527013

Browse files
authoredApr 12, 2025··
[log] Follower fetch offset need truncate to leaderEndOffsetSnapshot if we found duplicated batch or out of sequence batch (#731)
1 parent 4e7b552 commit b527013

File tree

10 files changed

+192
-11
lines changed

10 files changed

+192
-11
lines changed
 

‎fluss-server/src/main/java/com/alibaba/fluss/server/log/ListOffsetsParam.java

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class ListOffsetsParam {
4141
*/
4242
public static final int TIMESTAMP_OFFSET_TYPE = 2;
4343

44+
/** The offset type indicate the end offset snapshot when become leader. */
45+
public static final int LEADER_END_OFFSET_SNAPSHOT_TYPE = 3;
46+
4447
private final int followerServerId;
4548
private final Integer offsetType;
4649
private @Nullable final Long startTimestamp;

‎fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java

+40-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.config.ConfigOptions;
2121
import com.alibaba.fluss.config.Configuration;
2222
import com.alibaba.fluss.exception.CorruptRecordException;
23+
import com.alibaba.fluss.exception.DuplicateSequenceException;
2324
import com.alibaba.fluss.exception.FlussRuntimeException;
2425
import com.alibaba.fluss.exception.InvalidTimestampException;
2526
import com.alibaba.fluss.exception.LogOffsetOutOfRangeException;
@@ -105,6 +106,9 @@ public final class LogTablet {
105106
@GuardedBy("lock")
106107
private volatile LogOffsetMetadata highWatermarkMetadata;
107108

109+
/** The leader end offset snapshot when become leader. */
110+
private volatile long leaderEndOffsetSnapshot = -1L;
111+
108112
// The minimum offset that should be retained in the local log. This is used to ensure that,
109113
// the offset of kv snapshot should be retained, otherwise, kv recovery will fail.
110114
private volatile long minRetainOffset;
@@ -259,6 +263,10 @@ public LogFormat getLogFormat() {
259263
return logFormat;
260264
}
261265

266+
public long getLeaderEndOffsetSnapshot() {
267+
return leaderEndOffsetSnapshot;
268+
}
269+
262270
@VisibleForTesting
263271
public WriterStateManager writerStateManager() {
264272
return writerStateManager;
@@ -336,6 +344,16 @@ public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
336344
MetricNames.LOG_FLUSH_LATENCY_MS, localLog.getFlushLatencyHistogram());
337345
}
338346

347+
public void updateLeaderEndOffsetSnapshot() {
348+
synchronized (lock) {
349+
LOG.info(
350+
"Update leaderEndOffsetSnapshot to {} for tb {} while become leader",
351+
localLogEndOffset(),
352+
localLog.getTableBucket());
353+
leaderEndOffsetSnapshot = localLog.getLocalLogEndOffset();
354+
}
355+
}
356+
339357
/**
340358
* Append this message set to the active segment of the local log, assigning offsets and Bucket
341359
* Leader Epochs.
@@ -582,10 +600,10 @@ private LogOffsetMetadata convertToOffsetMetadataOrThrow(long offset) throws IOE
582600
* segment if necessary.
583601
*
584602
* <p>This method will generally be responsible for assigning offsets to the messages, however
585-
* if the needAssignOffsetAndTimestamp=false flag is passed we will only check that the existing
586-
* offsets are valid.
603+
* if the appendAsLeader=false flag is passed we will only check that the existing offsets are
604+
* valid.
587605
*/
588-
private LogAppendInfo append(MemoryLogRecords records, boolean needAssignOffsetAndTimestamp)
606+
private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
589607
throws Exception {
590608
LogAppendInfo appendInfo = analyzeAndValidateRecords(records);
591609

@@ -599,7 +617,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean needAssignOffsetA
599617

600618
synchronized (lock) {
601619
localLog.checkIfMemoryMappedBufferClosed();
602-
if (needAssignOffsetAndTimestamp) {
620+
if (appendAsLeader) {
603621
long offset = localLog.getLocalLogEndOffset();
604622
// assign offsets to the message set.
605623
appendInfo.setFirstOffset(offset);
@@ -630,11 +648,24 @@ private LogAppendInfo append(MemoryLogRecords records, boolean needAssignOffsetA
630648
// have duplicated batch metadata, skip the append and update append info.
631649
WriterStateEntry.BatchMetadata duplicatedBatch = validateResult.left();
632650
long startOffset = duplicatedBatch.firstOffset();
633-
appendInfo.setFirstOffset(startOffset);
634-
appendInfo.setLastOffset(duplicatedBatch.lastOffset);
635-
appendInfo.setMaxTimestamp(duplicatedBatch.timestamp);
636-
appendInfo.setStartOffsetOfMaxTimestamp(startOffset);
637-
appendInfo.setDuplicated(true);
651+
if (appendAsLeader) {
652+
appendInfo.setFirstOffset(startOffset);
653+
appendInfo.setLastOffset(duplicatedBatch.lastOffset);
654+
appendInfo.setMaxTimestamp(duplicatedBatch.timestamp);
655+
appendInfo.setStartOffsetOfMaxTimestamp(startOffset);
656+
appendInfo.setDuplicated(true);
657+
} else {
658+
String errorMsg =
659+
String.format(
660+
"Found duplicated batch for table bucket %s, duplicated offset is %s, "
661+
+ "writer id is %s and batch sequence is: %s",
662+
getTableBucket(),
663+
duplicatedBatch.lastOffset,
664+
duplicatedBatch.writerId,
665+
duplicatedBatch.batchSequence);
666+
LOG.error(errorMsg);
667+
throw new DuplicateSequenceException(errorMsg);
668+
}
638669
} else {
639670
// Append the records, and increment the local log end offset immediately after
640671
// append because write to the transaction index below may fail, and we want to

‎fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateEntry.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public long lastBatchTimestamp() {
8686
}
8787

8888
public void addBath(int batchSequence, long lastOffset, int offsetDelta, long timestamp) {
89-
addBatchMetadata(new BatchMetadata(batchSequence, lastOffset, offsetDelta, timestamp));
89+
addBatchMetadata(
90+
new BatchMetadata(writerId, batchSequence, lastOffset, offsetDelta, timestamp));
9091
this.lastTimestamp = timestamp;
9192
}
9293

@@ -129,12 +130,19 @@ public WriterStateEntry withWriterIdAndBatchMetadata(
129130

130131
/** Metadata of a batch. */
131132
public static final class BatchMetadata {
133+
public final long writerId;
132134
public final int batchSequence;
133135
public final long lastOffset;
134136
public final int offsetDelta;
135137
public final long timestamp;
136138

137-
public BatchMetadata(int batchSequence, long lastOffset, int offsetDelta, long timestamp) {
139+
public BatchMetadata(
140+
long writerId,
141+
int batchSequence,
142+
long lastOffset,
143+
int offsetDelta,
144+
long timestamp) {
145+
this.writerId = writerId;
138146
this.batchSequence = batchSequence;
139147
this.lastOffset = lastOffset;
140148
this.offsetDelta = offsetDelta;

‎fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java

+1
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ private static List<WriterStateEntry> readSnapshot(File file) {
437437
snapshotEntry.writerId,
438438
snapshotEntry.lastBatchTimestamp,
439439
new WriterStateEntry.BatchMetadata(
440+
snapshotEntry.writerId,
440441
snapshotEntry.lastBatchSequence,
441442
snapshotEntry.lastBatchBaseOffset,
442443
snapshotEntry.lastBatchOffsetDelta,

‎fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java

+13
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,8 @@ public LogOffsetSnapshot fetchOffsetSnapshot(boolean fetchOnlyFromLeader) throws
506506
// -------------------------------------------------------------------------------------------
507507

508508
private void onBecomeNewLeader() {
509+
updateLeaderEndOffsetSnapshot();
510+
509511
if (isKvTable()) {
510512
// if it's become new leader, we must
511513
// first destroy the old kv tablet
@@ -523,6 +525,11 @@ private void onBecomeNewFollower() {
523525
}
524526
}
525527

528+
@VisibleForTesting
529+
public void updateLeaderEndOffsetSnapshot() {
530+
logTablet.updateLeaderEndOffsetSnapshot();
531+
}
532+
526533
private void createKv() {
527534
try {
528535
// create a closeable registry for the closable related to kv
@@ -792,6 +799,10 @@ private void startPeriodicKvSnapshot(@Nullable CompletedSnapshot completedSnapsh
792799
}
793800
}
794801

802+
public long getLeaderEndOffsetSnapshot() {
803+
return logTablet.getLeaderEndOffsetSnapshot();
804+
}
805+
795806
public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, int requiredAcks)
796807
throws Exception {
797808
return inReadLock(
@@ -1235,6 +1246,8 @@ public long getOffset(RemoteLogManager remoteLogManager, ListOffsetsParam listOf
12351246
}
12361247
} else if (offsetType == ListOffsetsParam.LATEST_OFFSET_TYPE) {
12371248
return getLatestOffset(listOffsetsParam.getFollowerServerId());
1249+
} else if (offsetType == ListOffsetsParam.LEADER_END_OFFSET_SNAPSHOT_TYPE) {
1250+
return logTablet.getLeaderEndOffsetSnapshot();
12381251
} else {
12391252
throw new IllegalArgumentException(
12401253
"Invalid list offset type: " + offsetType);

‎fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/LeaderEndpoint.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ interface LeaderEndpoint {
3636
/** Fetches the local log start offset of the given table bucket. */
3737
CompletableFuture<Long> fetchLocalLogStartOffset(TableBucket tableBucket);
3838

39+
CompletableFuture<Long> fetchLeaderEndOffsetSnapshot(TableBucket tableBucket);
40+
3941
/**
4042
* Given a fetchLogRequest, carries out the expected request and returns the results from
4143
* fetching from the leader.

‎fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java

+5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public CompletableFuture<Long> fetchLocalLogStartOffset(TableBucket tableBucket)
8181
return fetchLogOffset(tableBucket, ListOffsetsParam.EARLIEST_OFFSET_TYPE);
8282
}
8383

84+
@Override
85+
public CompletableFuture<Long> fetchLeaderEndOffsetSnapshot(TableBucket tableBucket) {
86+
return fetchLogOffset(tableBucket, ListOffsetsParam.LEADER_END_OFFSET_SNAPSHOT_TYPE);
87+
}
88+
8489
@Override
8590
public CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> fetchLog(
8691
FetchLogRequest fetchLogRequest) {

‎fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThread.java

+42
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package com.alibaba.fluss.server.replica.fetcher;
1818

1919
import com.alibaba.fluss.exception.CorruptRecordException;
20+
import com.alibaba.fluss.exception.DuplicateSequenceException;
21+
import com.alibaba.fluss.exception.InvalidOffsetException;
2022
import com.alibaba.fluss.exception.InvalidRecordException;
23+
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
2124
import com.alibaba.fluss.exception.RemoteStorageException;
2225
import com.alibaba.fluss.exception.StorageException;
2326
import com.alibaba.fluss.metadata.TableBucket;
@@ -317,6 +320,27 @@ private void handleFetchLogResponseOfSuccessBucket(
317320
currentFetchStatus.fetchOffset(),
318321
e);
319322
removeBucket(tableBucket);
323+
} else if (e instanceof DuplicateSequenceException
324+
|| e instanceof OutOfOrderSequenceException
325+
|| e instanceof InvalidOffsetException) {
326+
// TODO this part of logic need to be removed after we introduce leader epoch cache.
327+
// Trace by https://github.com/alibaba/fluss/issues/673
328+
LOG.error(
329+
"Founding recoverable error while processing data for bucket {} at offset {}, try to "
330+
+ "truncate to LeaderEndOffsetSnapshot",
331+
tableBucket,
332+
currentFetchStatus.fetchOffset(),
333+
e);
334+
try {
335+
truncateToLeaderEndOffsetSnapshot(tableBucket);
336+
} catch (Exception ex) {
337+
LOG.error(
338+
"Error while truncating bucket {} at offset {}",
339+
tableBucket,
340+
currentFetchStatus.fetchOffset(),
341+
ex);
342+
removeBucket(tableBucket);
343+
}
320344
} else {
321345
LOG.error(
322346
"Unexpected error occurred while processing data for bucket {} at offset {}",
@@ -328,6 +352,24 @@ private void handleFetchLogResponseOfSuccessBucket(
328352
}
329353
}
330354

355+
private void truncateToLeaderEndOffsetSnapshot(TableBucket tableBucket) throws Exception {
356+
long leaderLocalEndOffsetWhileBecomeLeader =
357+
leader.fetchLeaderEndOffsetSnapshot(tableBucket).get();
358+
long localLogEndOffset =
359+
replicaManager.getReplicaOrException(tableBucket).getLocalLogEndOffset();
360+
if (leaderLocalEndOffsetWhileBecomeLeader != 0L
361+
&& leaderLocalEndOffsetWhileBecomeLeader < localLogEndOffset) {
362+
// truncate to leaderEndOffsetSnapshot to reset follower's WriterState and fetch offset.
363+
truncate(tableBucket, leaderLocalEndOffsetWhileBecomeLeader);
364+
365+
// update fetch status.
366+
BucketFetchStatus bucketFetchStatus =
367+
new BucketFetchStatus(
368+
tableBucket.getTableId(), leaderLocalEndOffsetWhileBecomeLeader, null);
369+
fairBucketStatusMap.updateAndMoveToEnd(tableBucket, bucketFetchStatus);
370+
}
371+
}
372+
331373
private boolean handleOutOfRangeError(TableBucket tableBucket, BucketFetchStatus fetchStatus) {
332374
try {
333375
BucketFetchStatus newFetchStatus = fetchOffsetAndTruncate(tableBucket);

‎fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java

+70
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
7575
import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH;
7676
import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
77+
import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
7778
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
7879
import static org.assertj.core.api.Assertions.assertThat;
7980

@@ -199,6 +200,75 @@ void testFollowerHighWatermarkHigherThanOrEqualToLeader() throws Exception {
199200
}
200201
}
201202

203+
// TODO this test need to be removed after we introduce leader epoch cache. Trace by
204+
// https://github.com/alibaba/fluss/issues/673
205+
@Test
206+
void testAppendAsFollowerThrowDuplicatedBatchException() throws Exception {
207+
Replica leaderReplica = leaderRM.getReplicaOrException(tb);
208+
Replica followerReplica = followerRM.getReplicaOrException(tb);
209+
210+
// 1. append same batches to leader and follower with different writer id.
211+
CompletableFuture<List<ProduceLogResultForBucket>> future;
212+
List<Long> writerIds = Arrays.asList(100L, 101L);
213+
long baseOffset = 0L;
214+
for (long writerId : writerIds) {
215+
for (int i = 0; i < 5; i++) {
216+
future = new CompletableFuture<>();
217+
leaderRM.appendRecordsToLog(
218+
1000,
219+
1,
220+
Collections.singletonMap(
221+
tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, i, 0)),
222+
future::complete);
223+
assertThat(future.get())
224+
.containsOnly(
225+
new ProduceLogResultForBucket(tb, baseOffset, baseOffset + 10L));
226+
227+
followerReplica.appendRecordsToFollower(
228+
genMemoryLogRecordsWithWriterId(DATA1, writerId, i, baseOffset));
229+
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(baseOffset + 10L);
230+
baseOffset = baseOffset + 10L;
231+
}
232+
}
233+
234+
// 2. append one batch to follower with (writerId=100L, batchSequence=5 offset=100L) to mock
235+
// follower have one batch ahead of leader.
236+
followerReplica.appendRecordsToFollower(
237+
genMemoryLogRecordsWithWriterId(DATA1, 100L, 5, 100L));
238+
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(110L);
239+
240+
// 3. mock becomeLeaderAndFollower as follower end.
241+
leaderReplica.updateLeaderEndOffsetSnapshot();
242+
followerFetcher.addBuckets(
243+
Collections.singletonMap(
244+
tb, new InitialFetchStatus(DATA1_TABLE_ID, leader.id(), 110L)));
245+
followerFetcher.start();
246+
247+
// 4. mock append to leader with different writer id (writerId=101L, batchSequence=5
248+
// offset=100L) to mock leader receive different batch from recovery follower.
249+
future = new CompletableFuture<>();
250+
leaderRM.appendRecordsToLog(
251+
1000,
252+
1,
253+
Collections.singletonMap(tb, genMemoryLogRecordsWithWriterId(DATA1, 101L, 5, 100L)),
254+
future::complete);
255+
assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 100L, 110L));
256+
257+
// 5. mock append to leader with (writerId=100L, batchSequence=5 offset=110L) to mock
258+
// follower fetch duplicated batch from leader. In this case follower will truncate to
259+
// LeaderEndOffsetSnapshot and fetch again.
260+
future = new CompletableFuture<>();
261+
leaderRM.appendRecordsToLog(
262+
1000,
263+
1,
264+
Collections.singletonMap(tb, genMemoryLogRecordsWithWriterId(DATA1, 100L, 5, 110L)),
265+
future::complete);
266+
assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 110L, 120L));
267+
retry(
268+
Duration.ofSeconds(20),
269+
() -> assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(120L));
270+
}
271+
202272
private void registerTableInZkClient() throws Exception {
203273
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
204274
zkClient.registerTable(

‎fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java

+6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public CompletableFuture<Long> fetchLocalLogStartOffset(TableBucket tableBucket)
7575
return CompletableFuture.completedFuture(replica.getLocalLogStartOffset());
7676
}
7777

78+
@Override
79+
public CompletableFuture<Long> fetchLeaderEndOffsetSnapshot(TableBucket tableBucket) {
80+
Replica replica = replicaManager.getReplicaOrException(tableBucket);
81+
return CompletableFuture.completedFuture(replica.getLeaderEndOffsetSnapshot());
82+
}
83+
7884
@Override
7985
public CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> fetchLog(
8086
FetchLogRequest fetchLogRequest) {

0 commit comments

Comments
 (0)
Please sign in to comment.