Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
main (development)
Please describe the bug 🐞
When I try to do an uncleaned shutdown, I found that some follower replicas were not added back to the ISR
set. Upon analysis, it was discovered that the ReplicaFetcherThread
encountered an error while writing the fetched replicaData to local storage:
`ERROR com.alibaba.fluss.server.replica.fetcher.ReplicaFetcherThread [] - Unexpected error occurred while processing data for bucket TableBucket{tableId=55, partitionId=150, bucket=79} at offset 1638983
com.alibaba.fluss.exception.OutOfOrderSequenceException: Out of order batch sequence for writer 9427 at offset 1638983 in table-bucket TableBucket{tableId=55, partitionId=150, bucket=79} : 69 (incoming batch seq.), -1 (current batch seq.)
As this happen, ReplicaFetcherThread
will remove this bucket which cause this follower didn't fetch any data from leader, and will be kill off from isr set.
The root cause of this error is that when one tabletServer
takes a long time to recover from a shutdown, the follower of one replica may have a very large offset gap compared to the leader at this point, and multiple segments may have already been moved to remote storage. Currently, Fluss does not handle this situation properly, resulting in WriterState
failing to recover correctly. This error can be reproduced locally as follows:
and an tests in RemoteLogITCase, like:
@Test
void testFollowerFetchMoveToRemoteLogWithWriterStates() throws Exception {
long tableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb);
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
int follower;
for (int i = 0; true; i++) {
if (i != leader) {
follower = i;
break;
}
}
// kill follower, and restart after some segments in leader has been copied to remote.
FLUSS_CLUSTER_EXTENSION.stopTabletServer(follower);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
// produce many records to trigger remote log copy.
for (int i = 0; i < 10; i++) {
assertProduceLogResponse(
leaderGateWay
.produceLog(
newProduceLogRequest(
tableId,
0,
1,
genMemoryLogRecordsWithWriterId(DATA1, 100, i, 0L)))
.get(),
0,
i * 10L);
}
FLUSS_CLUSTER_EXTENSION.waitUtilReplicaShrinkFromIsr(tb, follower);
FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(tb);
// restart follower
FLUSS_CLUSTER_EXTENSION.startTabletServer(follower);
FLUSS_CLUSTER_EXTENSION.waitUtilReplicaExpandToIsr(tb, follower);
}
As we change log level to info, the error will be found:
Solution
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!