Skip to content

Commit d5e32b7

Browse files
committed
[server] Fix ReplicaFetcherThread keeps throwing UnknownServerException because of corrupt index file
1 parent babf91b commit d5e32b7

File tree

3 files changed

+188
-37
lines changed

3 files changed

+188
-37
lines changed

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public class ReplicaManager {
176176
private final SnapshotContext kvSnapshotContext;
177177

178178
// remote log manager for remote log storage.
179-
private final RemoteLogManager remoteLogManager;
179+
protected final RemoteLogManager remoteLogManager;
180180

181181
// for metrics
182182
private final TabletServerMetricGroup serverMetricGroup;
@@ -217,7 +217,7 @@ public ReplicaManager(
217217
}
218218

219219
@VisibleForTesting
220-
ReplicaManager(
220+
protected ReplicaManager(
221221
Configuration conf,
222222
Scheduler scheduler,
223223
LogManager logManager,
@@ -1111,7 +1111,9 @@ public Map<TableBucket, LogReadResult> readFromLog(
11111111

11121112
FetchLogResultForBucket result;
11131113
if (replica != null && e instanceof LogOffsetOutOfRangeException) {
1114-
result = handleFetchOutOfRangeException(replica, fetchOffset, e);
1114+
result =
1115+
handleFetchOutOfRangeException(
1116+
replica, fetchOffset, fetchParams.isFromFollower(), e);
11151117
} else {
11161118
result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e));
11171119
}
@@ -1123,7 +1125,7 @@ public Map<TableBucket, LogReadResult> readFromLog(
11231125
}
11241126

11251127
private FetchLogResultForBucket handleFetchOutOfRangeException(
1126-
Replica replica, long fetchOffset, Exception e) {
1128+
Replica replica, long fetchOffset, boolean isFromFollower, Exception e) {
11271129
TableBucket tb = replica.getTableBucket();
11281130
if (fetchOffset == FetchParams.FETCH_FROM_EARLIEST_OFFSET) {
11291131
fetchOffset = replica.getLogStartOffset();
@@ -1141,7 +1143,8 @@ private FetchLogResultForBucket handleFetchOutOfRangeException(
11411143
// of RemoteLogSegment. For client fetcher, it will fetch the log from remote in client.
11421144
// For follower, it can update its local metadata to adjust the next fetch offset.
11431145
else if (canFetchFromRemoteLog(replica, fetchOffset)) {
1144-
RemoteLogFetchInfo remoteLogFetchInfo = fetchLogFromRemote(replica, fetchOffset);
1146+
RemoteLogFetchInfo remoteLogFetchInfo =
1147+
fetchLogFromRemote(replica, fetchOffset, isFromFollower);
11451148
if (remoteLogFetchInfo != null) {
11461149
return new FetchLogResultForBucket(
11471150
tb, remoteLogFetchInfo, replica.getLogHighWatermark());
@@ -1167,13 +1170,20 @@ private boolean canFetchFromRemoteLog(Replica replica, long fetchOffset) {
11671170
return replica.getLogTablet().canFetchFromRemoteLog(fetchOffset);
11681171
}
11691172

1170-
private @Nullable RemoteLogFetchInfo fetchLogFromRemote(Replica replica, long fetchOffset) {
1173+
private @Nullable RemoteLogFetchInfo fetchLogFromRemote(
1174+
Replica replica, long fetchOffset, boolean isFromFollower) {
11711175
List<RemoteLogSegment> remoteLogSegmentList =
11721176
remoteLogManager.relevantRemoteLogSegments(replica.getTableBucket(), fetchOffset);
11731177
if (!remoteLogSegmentList.isEmpty()) {
1174-
int firstStartPos =
1175-
remoteLogManager.lookupPositionForOffset(
1176-
remoteLogSegmentList.get(0), fetchOffset);
1178+
// follower does not require firstStartPos, so we do not look up this value.
1179+
// On one hand, this reduces query overhead, and on the other hand,
1180+
// it helps prevent unexpected issues caused by corrupted index files.
1181+
int firstStartPos = -1;
1182+
if (!isFromFollower) {
1183+
firstStartPos =
1184+
remoteLogManager.lookupPositionForOffset(
1185+
remoteLogSegmentList.get(0), fetchOffset);
1186+
}
11771187
PhysicalTablePath physicalTablePath = replica.getPhysicalTablePath();
11781188
FsPath remoteLogTabletDir =
11791189
FlussPaths.remoteLogTabletDir(

fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.rpc.protocol.ApiError;
2727
import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
2828
import org.apache.fluss.server.entity.FetchReqInfo;
29+
import org.apache.fluss.server.exception.CorruptIndexException;
2930
import org.apache.fluss.server.log.FetchParams;
3031
import org.apache.fluss.server.log.LogTablet;
3132
import org.apache.fluss.server.replica.Replica;
@@ -34,6 +35,10 @@
3435
import org.junit.jupiter.params.ParameterizedTest;
3536
import org.junit.jupiter.params.provider.ValueSource;
3637

38+
import java.io.File;
39+
import java.nio.ByteBuffer;
40+
import java.nio.channels.FileChannel;
41+
import java.nio.file.StandardOpenOption;
3742
import java.time.Duration;
3843
import java.util.Collections;
3944
import java.util.List;
@@ -351,6 +356,62 @@ void testFetchRecordsFromRemote(boolean partitionTable) throws Exception {
351356
assertThat(resultForBucket.fetchFromRemote()).isFalse();
352357
}
353358

359+
@ParameterizedTest
360+
@ValueSource(booleans = {true, false})
361+
void testFetchRecordsFromRemoteWithCorruptIndex(boolean partitionTable) throws Exception {
362+
TableBucket tb = makeTableBucket(partitionTable);
363+
// Need to make leader by ReplicaManager.
364+
makeLogTableAsLeader(tb, partitionTable);
365+
LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet();
366+
addMultiSegmentsToLogTablet(logTablet, 5);
367+
368+
File file = logTablet.getSegments().get(0).timeIndex().file();
369+
// mock corrupt index
370+
try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND)) {
371+
for (int i = 0; i < 12; i++) {
372+
fileChannel.write(ByteBuffer.wrap(new byte[] {0}));
373+
}
374+
}
375+
376+
// trigger RLMTask copy local log segment to remote and update metadata.
377+
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
378+
List<RemoteLogSegment> remoteLogSegmentList =
379+
remoteLogManager.relevantRemoteLogSegments(tb, 0L);
380+
assertThat(remoteLogSegmentList.size()).isEqualTo(4);
381+
382+
logTablet.updateRemoteLogEndOffset(40L);
383+
384+
// 1. fetch log records from client, should throw CorruptIndexException since we mock a
385+
// corrupt index
386+
CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future =
387+
new CompletableFuture<>();
388+
CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> finalFuture = future;
389+
assertThatThrownBy(
390+
() ->
391+
replicaManager.fetchLogRecords(
392+
new FetchParams(-1, Integer.MAX_VALUE),
393+
Collections.singletonMap(
394+
tb,
395+
new FetchReqInfo(tb.getTableId(), 0L, 1024 * 1024)),
396+
finalFuture::complete))
397+
.isInstanceOf(CorruptIndexException.class);
398+
399+
// 2. fetch log records from follower, should succeed since we don't need to lookup index
400+
future = new CompletableFuture<>();
401+
replicaManager.fetchLogRecords(
402+
new FetchParams(1, Integer.MAX_VALUE),
403+
Collections.singletonMap(tb, new FetchReqInfo(tb.getTableId(), 0L, 1024 * 1024)),
404+
future::complete);
405+
Map<TableBucket, FetchLogResultForBucket> result = future.get();
406+
assertThat(result.size()).isEqualTo(1);
407+
FetchLogResultForBucket resultForBucket = result.get(tb);
408+
assertThat(resultForBucket.getError()).isEqualTo(ApiError.NONE);
409+
assertThat(resultForBucket.getTableBucket()).isEqualTo(tb);
410+
assertThat(resultForBucket.fetchFromRemote()).isTrue();
411+
assertThat(resultForBucket.records()).isNull();
412+
assertThat(resultForBucket.getHighWatermark()).isEqualTo(50L);
413+
}
414+
354415
@ParameterizedTest
355416
@ValueSource(booleans = {true, false})
356417
void testCleanupLocalSegments(boolean partitionTable) throws Exception {

0 commit comments

Comments
 (0)