Skip to content

Commit 08403d1

Browse files
committed
[server] Fix ReplicaFetcherThread keeps throwing OutOfOrderSequenceException because of writer id expire
1 parent 3a48f6d commit 08403d1

File tree

6 files changed

+185
-25
lines changed

6 files changed

+185
-25
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
643643
// now that we have valid records, offsets assigned, we need to validate the idempotent
644644
// state of the writers and collect some metadata.
645645
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
646-
analyzeAndValidateWriterState(validRecords);
646+
analyzeAndValidateWriterState(validRecords, appendAsLeader);
647647

648648
if (validateResult.isLeft()) {
649649
// have duplicated batch metadata, skip the append and update append info.
@@ -1006,7 +1006,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10061006

10071007
/** Returns either the duplicated batch metadata (left) or the updated writers (right). */
10081008
private Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>>
1009-
analyzeAndValidateWriterState(MemoryLogRecords records) {
1009+
analyzeAndValidateWriterState(MemoryLogRecords records, boolean isAppendAsLeader) {
10101010
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
10111011

10121012
for (LogRecordBatch batch : records.batches()) {
@@ -1023,14 +1023,15 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10231023
}
10241024

10251025
// update write append info.
1026-
updateWriterAppendInfo(writerStateManager, batch, updatedWriters);
1026+
updateWriterAppendInfo(writerStateManager, batch, updatedWriters, isAppendAsLeader);
10271027
}
10281028
}
10291029

10301030
return Either.right(updatedWriters.values());
10311031
}
10321032

1033-
void removeExpiredWriter(long currentTimeMs) {
1033+
@VisibleForTesting
1034+
public void removeExpiredWriter(long currentTimeMs) {
10341035
synchronized (lock) {
10351036
writerStateManager.removeExpiredWriters(currentTimeMs);
10361037
}
@@ -1110,14 +1111,16 @@ private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionR
11101111
private static void updateWriterAppendInfo(
11111112
WriterStateManager writerStateManager,
11121113
LogRecordBatch batch,
1113-
Map<Long, WriterAppendInfo> writers) {
1114+
Map<Long, WriterAppendInfo> writers,
1115+
boolean isAppendAsLeader) {
11141116
long writerId = batch.writerId();
11151117
// update writers.
11161118
WriterAppendInfo appendInfo =
11171119
writers.computeIfAbsent(writerId, id -> writerStateManager.prepareUpdate(writerId));
11181120
appendInfo.append(
11191121
batch,
1120-
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
1122+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
1123+
isAppendAsLeader);
11211124
}
11221125

11231126
static void rebuildWriterState(
@@ -1230,7 +1233,7 @@ private static void loadWritersFromRecords(
12301233
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
12311234
for (LogRecordBatch batch : records.batches()) {
12321235
if (batch.hasWriterId()) {
1233-
updateWriterAppendInfo(writerStateManager, batch, loadedWriters);
1236+
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
12341237
}
12351238
}
12361239
loadedWriters.values().forEach(writerStateManager::update);

fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ public long writerId() {
4444
return writerId;
4545
}
4646

47-
public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
47+
public void append(
48+
LogRecordBatch batch, boolean isWriterInBatchExpired, boolean isAppendAsLeader) {
4849
LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(batch.baseLogOffset());
4950
appendDataBatch(
5051
batch.batchSequence(),
5152
firstOffsetMetadata,
5253
batch.lastLogOffset(),
5354
isWriterInBatchExpired,
55+
isAppendAsLeader,
5456
batch.commitTimestamp());
5557
}
5658

@@ -59,8 +61,9 @@ public void appendDataBatch(
5961
LogOffsetMetadata firstOffsetMetadata,
6062
long lastOffset,
6163
boolean isWriterInBatchExpired,
64+
boolean isAppendAsLeader,
6265
long batchTimestamp) {
63-
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset);
66+
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset, isAppendAsLeader);
6467
updatedEntry.addBath(
6568
batchSequence,
6669
lastOffset,
@@ -69,13 +72,16 @@ public void appendDataBatch(
6972
}
7073

7174
private void maybeValidateDataBatch(
72-
int appendFirstSeq, boolean isWriterInBatchExpired, long lastOffset) {
75+
int appendFirstSeq,
76+
boolean isWriterInBatchExpired,
77+
long lastOffset,
78+
boolean isAppendAsLeader) {
7379
int currentLastSeq =
7480
!updatedEntry.isEmpty()
7581
? updatedEntry.lastBatchSequence()
7682
: currentEntry.lastBatchSequence();
7783
// must be in sequence, even for the first batch should start from 0
78-
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired)) {
84+
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired, isAppendAsLeader)) {
7985
throw new OutOfOrderSequenceException(
8086
String.format(
8187
"Out of order batch sequence for writer %s at offset %s in "
@@ -93,16 +99,53 @@ public WriterStateEntry toEntry() {
9399
* three scenarios will be judged as in sequence:
94100
*
95101
* <ul>
96-
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check whether the committed
97-
* timestamp of the next batch under the current writerId has expired. If it has expired,
98-
* we consider this a special case caused by writerId expiration, for this case, to ensure
99-
* the correctness of follower sync, we still treat it as in sequence.
100-
* <li>nextBatchSeq == lastBatchSeq + 1L
101-
* <li>lastBatchSeq reaches its maximum value
102+
* <li>1. If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two scenarios will be judged
103+
* as in sequence:
104+
* <ul>
105+
* <li>1.1 If the committed timestamp of the next batch under the current writerId has
106+
* expired, we consider this a special case caused by writerId expiration, for this
107+
* case, to ensure the correctness of follower sync, we still treat it as in
108+
* sequence.
109+
* <li>1.2 If the append request is from the follower, we consider this is a special
110+
* case caused by inconsistent expiration of writerId between the leader and
111+
* follower. To prevent continuous fetch failures on the follower side, we still
112+
* treat it as in sequence.
113+
* </ul>
114+
* <li>2. nextBatchSeq == lastBatchSeq + 1L
115+
* <li>3. lastBatchSeq reaches its maximum value
102116
* </ul>
117+
*
118+
* <p>For case 1.2, here is a detailed example: The expiration of a writer is triggered
119+
* asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at intervals defined by
120+
* {@code server.writer-id.expiration-check-interval}, which can result in slight differences in
121+
* the actual expiration times of the same writer on the leader replica and follower replicas.
122+
* This slight difference leads to a dreadful corner case. Imagine the following scenario(set
123+
* {@code server.writer-id.expiration-check-interval}: 10min, {@code
124+
* server.writer-id.expiration-time}: 12h):
125+
*
126+
* <pre>{@code
127+
* Step Time Action of Leader Action of Follower
128+
* 1 00:03:38 receive batch 0 of writer 101
129+
* 2 00:03:38 fetch batch 0 of writer 101
130+
* 3 12:05:00 remove state of writer 101
131+
* 4 12:10:02 receive batch 1 of writer 101
132+
* 5 12:10:02 fetch batch 0 of writer 101
133+
* 6 12:11:00 remove state of writer 101
134+
* }</pre>
135+
*
136+
* <p>In step 3, the follower removes the state of writer 101 first, since it has been more than
137+
* 12 hours since writer 101's last batch write, making it safe to remove. However, since the
138+
* expiration of writer 101 has not yet occurred on the leader, and a new batch 1 is received at
139+
* this time, it is successfully written on the leader. At this point, the fetcher pulls batch 1
140+
* from the leader, but since the state of writer 101 has already been cleaned up, an {@link
141+
* OutOfOrderSequenceException} will occur during to write if we don't treat it as in sequence.
103142
*/
104-
private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean isWriterInBatchExpired) {
105-
return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
143+
private boolean inSequence(
144+
int lastBatchSeq,
145+
int nextBatchSeq,
146+
boolean isWriterInBatchExpired,
147+
boolean isAppendAsLeader) {
148+
return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired || !isAppendAsLeader))
106149
|| nextBatchSeq == lastBatchSeq + 1L
107150
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
108151
}

fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,14 @@ void testValidationOnFirstEntryWhenLoadingLog() {
126126
void testPrepareUpdateDoesNotMutate() {
127127
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
128128
appendInfo.appendDataBatch(
129-
0, new LogOffsetMetadata(15L), 20L, false, System.currentTimeMillis());
129+
0, new LogOffsetMetadata(15L), 20L, false, true, System.currentTimeMillis());
130130
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
131131
stateManager.update(appendInfo);
132132
assertThat(stateManager.lastEntry(writerId)).isPresent();
133133

134134
WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
135135
nextAppendInfo.appendDataBatch(
136-
1, new LogOffsetMetadata(26L), 30L, false, System.currentTimeMillis());
136+
1, new LogOffsetMetadata(26L), 30L, false, true, System.currentTimeMillis());
137137
assertThat(stateManager.lastEntry(writerId)).isPresent();
138138

139139
WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -521,6 +521,7 @@ private void append(
521521
new LogOffsetMetadata(offset),
522522
offset,
523523
isWriterInBatchExpired,
524+
true,
524525
lastTimestamp);
525526
stateManager.update(appendInfo);
526527
stateManager.updateMapEndOffset(offset + 1);

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.replica;
1919

2020
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.exception.OutOfOrderSequenceException;
2122
import org.apache.fluss.metadata.LogFormat;
2223
import org.apache.fluss.metadata.PhysicalTablePath;
2324
import org.apache.fluss.metadata.TableBucket;
@@ -48,6 +49,7 @@
4849
import java.io.File;
4950
import java.io.IOException;
5051
import java.nio.file.Path;
52+
import java.time.Duration;
5153
import java.util.ArrayList;
5254
import java.util.Arrays;
5355
import java.util.Collections;
@@ -75,10 +77,12 @@
7577
import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
7678
import static org.apache.fluss.testutils.DataTestUtils.genKvRecords;
7779
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
80+
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
7881
import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
7982
import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
8083
import static org.apache.fluss.utils.Preconditions.checkNotNull;
8184
import static org.assertj.core.api.Assertions.assertThat;
85+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
8286

8387
/** Test for {@link Replica}. */
8488
final class ReplicaTest extends ReplicaTestBase {
@@ -129,6 +133,40 @@ void testAppendRecordsToLeader() throws Exception {
129133
assertLogRecordsEquals(DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1);
130134
}
131135

136+
@Test
137+
void testAppendRecordsWithOutOfOrderBatchSequence() throws Exception {
138+
Replica logReplica =
139+
makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new TableBucket(DATA1_TABLE_ID, 1));
140+
makeLogReplicaAsLeader(logReplica);
141+
142+
long writerId = 101L;
143+
144+
// 1. append a batch with batchSequence = 0
145+
logReplica.appendRecordsToLeader(genMemoryLogRecordsWithWriterId(DATA1, writerId, 0, 0), 0);
146+
147+
// manual advance time and remove expired writer, the state of writer 101 will be removed
148+
manualClock.advanceTime(Duration.ofHours(12));
149+
manualClock.advanceTime(Duration.ofSeconds(1));
150+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
151+
.isEqualTo(1);
152+
logReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
153+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
154+
.isEqualTo(0);
155+
156+
// 2. try to append an out of ordered batch as leader, will throw
157+
// OutOfOrderSequenceException
158+
assertThatThrownBy(
159+
() ->
160+
logReplica.appendRecordsToLeader(
161+
genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10), 0))
162+
.isInstanceOf(OutOfOrderSequenceException.class);
163+
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(10);
164+
165+
// 3. try to append an out of ordered batch as follower
166+
logReplica.appendRecordsToFollower(genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10));
167+
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(20);
168+
}
169+
132170
@Test
133171
void testPartialPutRecordsToLeader() throws Exception {
134172
Replica kvReplica =

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ public void setup() throws Exception {
174174
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("512b"));
175175
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
176176

177+
conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, Duration.ofHours(12));
178+
177179
scheduler = new FlussScheduler(2);
178180
scheduler.startup();
179181

0 commit comments

Comments
 (0)