Skip to content

Commit 51675ab

Browse files
committed
[server] Fix ReplicaFetcherThread keeps throwing OutOfOrderSequenceException because of writer id expire
1 parent 5bd41ca commit 51675ab

File tree

8 files changed

+243
-25
lines changed

8 files changed

+243
-25
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,13 @@ public class ConfigOptions {
607607
.withDescription("The amount of time to sleep when fetch bucket error occurs.")
608608
.withFallbackKeys("log.replica.fetch-backoff-interval");
609609

610+
public static final ConfigOption<Integer> LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS =
611+
key("log.replica.fetch.max-retry-attempts")
612+
.intType()
613+
.defaultValue(Integer.MAX_VALUE)
614+
.withDescription(
615+
"The maximum number of retry attempts for the fetcher when meet OutOfOrderSequenceException.");
616+
610617
public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MAX_BYTES =
611618
key("log.replica.fetch.max-bytes")
612619
.memoryType()

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ public static LogTablet create(
296296
new WriterStateManager(
297297
tableBucket,
298298
tabletDir,
299-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
299+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
300+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
300301

301302
LoadedLogOffsets offsets =
302303
new LogLoader(
@@ -643,7 +644,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
643644
// now that we have valid records, offsets assigned, we need to validate the idempotent
644645
// state of the writers and collect some metadata.
645646
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
646-
analyzeAndValidateWriterState(validRecords);
647+
analyzeAndValidateWriterState(validRecords, appendAsLeader);
647648

648649
if (validateResult.isLeft()) {
649650
// have duplicated batch metadata, skip the append and update append info.
@@ -1006,7 +1007,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10061007

10071008
/** Returns either the duplicated batch metadata (left) or the updated writers (right). */
10081009
private Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>>
1009-
analyzeAndValidateWriterState(MemoryLogRecords records) {
1010+
analyzeAndValidateWriterState(MemoryLogRecords records, boolean isAppendAsLeader) {
10101011
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
10111012

10121013
for (LogRecordBatch batch : records.batches()) {
@@ -1023,14 +1024,15 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10231024
}
10241025

10251026
// update write append info.
1026-
updateWriterAppendInfo(writerStateManager, batch, updatedWriters);
1027+
updateWriterAppendInfo(writerStateManager, batch, updatedWriters, isAppendAsLeader);
10271028
}
10281029
}
10291030

10301031
return Either.right(updatedWriters.values());
10311032
}
10321033

1033-
void removeExpiredWriter(long currentTimeMs) {
1034+
@VisibleForTesting
1035+
public void removeExpiredWriter(long currentTimeMs) {
10341036
synchronized (lock) {
10351037
writerStateManager.removeExpiredWriters(currentTimeMs);
10361038
}
@@ -1110,14 +1112,16 @@ private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionR
11101112
private static void updateWriterAppendInfo(
11111113
WriterStateManager writerStateManager,
11121114
LogRecordBatch batch,
1113-
Map<Long, WriterAppendInfo> writers) {
1115+
Map<Long, WriterAppendInfo> writers,
1116+
boolean isAppendAsLeader) {
11141117
long writerId = batch.writerId();
11151118
// update writers.
11161119
WriterAppendInfo appendInfo =
11171120
writers.computeIfAbsent(writerId, id -> writerStateManager.prepareUpdate(writerId));
11181121
appendInfo.append(
11191122
batch,
1120-
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
1123+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
1124+
isAppendAsLeader);
11211125
}
11221126

11231127
static void rebuildWriterState(
@@ -1230,7 +1234,7 @@ private static void loadWritersFromRecords(
12301234
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
12311235
for (LogRecordBatch batch : records.batches()) {
12321236
if (batch.hasWriterId()) {
1233-
updateWriterAppendInfo(writerStateManager, batch, loadedWriters);
1237+
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
12341238
}
12351239
}
12361240
loadedWriters.values().forEach(writerStateManager::update);

fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,33 @@ public class WriterAppendInfo {
3333
private final WriterStateEntry currentEntry;
3434
private final WriterStateEntry updatedEntry;
3535

36-
public WriterAppendInfo(long writerId, TableBucket tableBucket, WriterStateEntry currentEntry) {
36+
private final WriterStateManager writerStateManager;
37+
38+
public WriterAppendInfo(
39+
long writerId,
40+
TableBucket tableBucket,
41+
WriterStateEntry currentEntry,
42+
WriterStateManager writerStateManager) {
3743
this.writerId = writerId;
3844
this.tableBucket = tableBucket;
3945
this.currentEntry = currentEntry;
4046
this.updatedEntry = currentEntry.withWriterIdAndBatchMetadata(writerId, null);
47+
this.writerStateManager = writerStateManager;
4148
}
4249

4350
public long writerId() {
4451
return writerId;
4552
}
4653

47-
public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
54+
public void append(
55+
LogRecordBatch batch, boolean isWriterInBatchExpired, boolean isAppendAsLeader) {
4856
LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(batch.baseLogOffset());
4957
appendDataBatch(
5058
batch.batchSequence(),
5159
firstOffsetMetadata,
5260
batch.lastLogOffset(),
5361
isWriterInBatchExpired,
62+
isAppendAsLeader,
5463
batch.commitTimestamp());
5564
}
5665

@@ -59,8 +68,9 @@ public void appendDataBatch(
5968
LogOffsetMetadata firstOffsetMetadata,
6069
long lastOffset,
6170
boolean isWriterInBatchExpired,
71+
boolean isAppendAsLeader,
6272
long batchTimestamp) {
63-
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset);
73+
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset, isAppendAsLeader);
6474
updatedEntry.addBath(
6575
batchSequence,
6676
lastOffset,
@@ -69,19 +79,64 @@ public void appendDataBatch(
6979
}
7080

7181
private void maybeValidateDataBatch(
72-
int appendFirstSeq, boolean isWriterInBatchExpired, long lastOffset) {
82+
int appendFirstSeq,
83+
boolean isWriterInBatchExpired,
84+
long lastOffset,
85+
boolean appendAsLeader) {
7386
int currentLastSeq =
7487
!updatedEntry.isEmpty()
7588
? updatedEntry.lastBatchSequence()
7689
: currentEntry.lastBatchSequence();
7790
// must be in sequence, even for the first batch should start from 0
7891
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired)) {
92+
/*
93+
* The expiration of a writer is triggered asynchronously by the PeriodicWriterIdExpirationCheck
94+
* thread at intervals defined by WRITER_ID_EXPIRATION_CHECK_INTERVAL, which can result in
95+
* slight differences in the actual expiration times of the same writer on the leader replica
96+
* and follower replicas.
97+
*
98+
* This slight difference leads to a dreadful corner case. Imagine the following scenario
99+
* (server.writer-id.expiration-check-interval: 10min, server.writer-id.expiration-time: 12h):
100+
*
101+
* Step Time Action of Leader Action of Follower
102+
* 1 2025-08-28 00:03:38 receive batch 0 of writer 101
103+
* 2 2025-08-28 00:03:38 fetch batch 0 of writer 101
104+
* 3 2025-08-28 12:05:00 remove state of writer 101
105+
* 4 2025-08-28 12:10:02 receive batch 1 of writer 101
106+
* 5 2025-08-28 12:10:02 fetch batch 0 of writer 101
107+
* 6 2025-08-28 12:11:00 remove state of writer 101
108+
*
109+
* In step 3, the follower removes the state of writer 101 first, since it has been more than 12 hours
110+
* since writer 101's last batch write, making it safe to remove. However, since the expiration of
111+
* writer 101 has not yet occurred on the leader, and a new batch 1 is received at this time, it is
112+
* successfully written on the leader. At this point, the fetcher pulls batch 1 from the leader,
113+
* but since the state of writer 101 has already been cleaned up, an OutOfOrderSequenceException will
114+
* occur during to write.
115+
*
116+
* To address this issue, a retry limit is imposed on the fetcher when an OutOfOrderSequenceException
117+
* occurs.
118+
* */
119+
if (!appendAsLeader
120+
&& currentLastSeq == NO_BATCH_SEQUENCE
121+
&& writerStateManager.isFetcherReachMaxRetryAttempts(writerId)) {
122+
writerStateManager.resetFetcherAttempts(writerId);
123+
return;
124+
}
125+
126+
if (!appendAsLeader && currentLastSeq == NO_BATCH_SEQUENCE) {
127+
writerStateManager.incFetcherAttempts(writerId);
128+
}
129+
79130
throw new OutOfOrderSequenceException(
80131
String.format(
81132
"Out of order batch sequence for writer %s at offset %s in "
82133
+ "table-bucket %s : %s (incoming batch seq.), %s (current batch seq.)",
83134
writerId, lastOffset, tableBucket, appendFirstSeq, currentLastSeq));
84135
}
136+
137+
if (!appendAsLeader && currentLastSeq == NO_BATCH_SEQUENCE) {
138+
writerStateManager.resetFetcherAttempts(writerId);
139+
}
85140
}
86141

87142
public WriterStateEntry toEntry() {

fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class WriterStateManager {
8181
private final TableBucket tableBucket;
8282
private final int writerExpirationMs;
8383
private final Map<Long, WriterStateEntry> writers = new HashMap<>();
84+
// map from writer id -> fetcher retry attempts
85+
private final Map<Long, Integer> writerFetcherRetryAttempts = new HashMap<>();
8486

8587
private final File logTabletDir;
8688
/** The same as writers#size, but for lock-free access. */
@@ -90,12 +92,19 @@ public class WriterStateManager {
9092
private long lastMapOffset = 0L;
9193
private long lastSnapOffset = 0L;
9294

93-
public WriterStateManager(TableBucket tableBucket, File logTabletDir, int writerExpirationMs)
95+
private final int fetcherMaxRetryAttempts;
96+
97+
public WriterStateManager(
98+
TableBucket tableBucket,
99+
File logTabletDir,
100+
int writerExpirationMs,
101+
int fetcherMaxRetryAttempts)
94102
throws IOException {
95103
this.tableBucket = tableBucket;
96104
this.writerExpirationMs = writerExpirationMs;
97105
this.logTabletDir = logTabletDir;
98106
this.snapshots = loadSnapshots();
107+
this.fetcherMaxRetryAttempts = fetcherMaxRetryAttempts;
99108
}
100109

101110
public int writerIdCount() {
@@ -226,7 +235,7 @@ public Optional<File> fetchSnapshot(long offset) {
226235
public WriterAppendInfo prepareUpdate(long writerId) {
227236
WriterStateEntry currentEntry =
228237
lastEntry(writerId).orElse(WriterStateEntry.empty(writerId));
229-
return new WriterAppendInfo(writerId, tableBucket, currentEntry);
238+
return new WriterAppendInfo(writerId, tableBucket, currentEntry, this);
230239
}
231240

232241
/** Update the mapping with the given append information. */
@@ -423,6 +432,23 @@ public boolean isWriterInBatchExpired(long currentTimeMs, LogRecordBatch recordB
423432
return currentTimeMs - recordBatch.commitTimestamp() > writerExpirationMs;
424433
}
425434

435+
public void incFetcherAttempts(long writerId) {
436+
writerFetcherRetryAttempts.put(
437+
writerId, writerFetcherRetryAttempts.getOrDefault(writerId, 0) + 1);
438+
}
439+
440+
public void resetFetcherAttempts(long writerId) {
441+
writerFetcherRetryAttempts.remove(writerId);
442+
}
443+
444+
public int getFetcherAttempts(long writerId) {
445+
return writerFetcherRetryAttempts.getOrDefault(writerId, 0);
446+
}
447+
448+
public boolean isFetcherReachMaxRetryAttempts(long writerId) {
449+
return getFetcherAttempts(writerId) >= fetcherMaxRetryAttempts;
450+
}
451+
426452
private static List<WriterStateEntry> readSnapshot(File file) {
427453
try {
428454
byte[] json = Files.readAllBytes(file.toPath());

fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public void setup() throws Exception {
7171
new WriterStateManager(
7272
tableBucket,
7373
logDir,
74-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
74+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
75+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
7576
}
7677

7778
@Test
@@ -126,14 +127,14 @@ void testValidationOnFirstEntryWhenLoadingLog() {
126127
void testPrepareUpdateDoesNotMutate() {
127128
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
128129
appendInfo.appendDataBatch(
129-
0, new LogOffsetMetadata(15L), 20L, false, System.currentTimeMillis());
130+
0, new LogOffsetMetadata(15L), 20L, false, false, System.currentTimeMillis());
130131
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
131132
stateManager.update(appendInfo);
132133
assertThat(stateManager.lastEntry(writerId)).isPresent();
133134

134135
WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
135136
nextAppendInfo.appendDataBatch(
136-
1, new LogOffsetMetadata(26L), 30L, false, System.currentTimeMillis());
137+
1, new LogOffsetMetadata(26L), 30L, false, false, System.currentTimeMillis());
137138
assertThat(stateManager.lastEntry(writerId)).isPresent();
138139

139140
WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -189,7 +190,8 @@ void testRemoveExpiredWritersOnReload() throws IOException {
189190
new WriterStateManager(
190191
tableBucket,
191192
logDir,
192-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
193+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
194+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
193195
recoveredMapping.truncateAndReload(0L, 1L, 70000);
194196

195197
// Entry added after recovery. The writer id should be expired now, and would not exist in
@@ -221,7 +223,8 @@ void testAppendAnExpiredBatchWithEmptyWriterStatus() throws Exception {
221223
new WriterStateManager(
222224
tableBucket,
223225
logDir,
224-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
226+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
227+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
225228

226229
// If we try to append an expired batch with none zero batch sequence, the
227230
// OutOfOrderSequenceException will not been throw.
@@ -371,7 +374,8 @@ void testWriterExpirationTimeout() throws Exception {
371374
new WriterStateManager(
372375
tableBucket,
373376
logDir,
374-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
377+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
378+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
375379
append(stateManager1, writerId, 0, 1L);
376380
stateManager1.removeExpiredWriters(System.currentTimeMillis() + 4000L);
377381

@@ -494,7 +498,8 @@ private void testLoadFromCorruptSnapshot(Consumer<FileChannel> makeFileCorrupt)
494498
new WriterStateManager(
495499
tableBucket,
496500
logDir,
497-
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
501+
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis(),
502+
conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_RETRY_ATTEMPTS));
498503
reloadedStateManager.truncateAndReload(0L, 20L, System.currentTimeMillis());
499504
assertThat(snapshotToTruncate.exists()).isFalse();
500505

@@ -521,6 +526,7 @@ private void append(
521526
new LogOffsetMetadata(offset),
522527
offset,
523528
isWriterInBatchExpired,
529+
false,
524530
lastTimestamp);
525531
stateManager.update(appendInfo);
526532
stateManager.updateMapEndOffset(offset + 1);

0 commit comments

Comments
 (0)