Skip to content

Commit e004e1f

Browse files
authored
[log] Use remote log index file to rebuild writer state for recovered lagged follower (#728)
This fixes the case that the follower fetches log which is already moved to remote, and needs to return RemoteLogFetchInfo to rebuild WriterState instead of throwing LogOffsetOutOfRangeException.
1 parent f3d924e commit e004e1f

37 files changed

+481
-294
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import java.util.Set;
7070
import java.util.stream.Collectors;
7171

72-
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.getFetchLogResultForBucket;
72+
import static com.alibaba.fluss.rpc.CommonRpcMessageUtils.getFetchLogResultForBucket;
7373
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
7474

7575
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

-79
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,6 @@
3333
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
3434
import com.alibaba.fluss.metadata.TableBucket;
3535
import com.alibaba.fluss.metadata.TablePath;
36-
import com.alibaba.fluss.record.LogRecords;
37-
import com.alibaba.fluss.record.MemoryLogRecords;
38-
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
39-
import com.alibaba.fluss.remote.RemoteLogSegment;
40-
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
4136
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
4237
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
4338
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
@@ -48,7 +43,6 @@
4843
import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse;
4944
import com.alibaba.fluss.rpc.messages.LookupRequest;
5045
import com.alibaba.fluss.rpc.messages.MetadataRequest;
51-
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket;
5246
import com.alibaba.fluss.rpc.messages.PbKeyValue;
5347
import com.alibaba.fluss.rpc.messages.PbKvSnapshot;
5448
import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket;
@@ -58,26 +52,20 @@
5852
import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
5953
import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket;
6054
import com.alibaba.fluss.rpc.messages.PbPutKvReqForBucket;
61-
import com.alibaba.fluss.rpc.messages.PbRemoteLogFetchInfo;
62-
import com.alibaba.fluss.rpc.messages.PbRemoteLogSegment;
6355
import com.alibaba.fluss.rpc.messages.PbRemotePathAndLocalFile;
6456
import com.alibaba.fluss.rpc.messages.PrefixLookupRequest;
6557
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
6658
import com.alibaba.fluss.rpc.messages.PutKvRequest;
67-
import com.alibaba.fluss.rpc.protocol.ApiError;
68-
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
6959

7060
import javax.annotation.Nullable;
7161

72-
import java.nio.ByteBuffer;
7362
import java.util.ArrayList;
7463
import java.util.Arrays;
7564
import java.util.Collection;
7665
import java.util.HashMap;
7766
import java.util.List;
7867
import java.util.Map;
7968
import java.util.Set;
80-
import java.util.UUID;
8169
import java.util.stream.Collectors;
8270

8371
import static com.alibaba.fluss.utils.Preconditions.checkState;
@@ -88,21 +76,6 @@
8876
*/
8977
public class ClientRpcMessageUtils {
9078

91-
public static ByteBuffer toByteBuffer(ByteBuf buf) {
92-
if (buf.isDirect()) {
93-
return buf.nioBuffer();
94-
} else if (buf.hasArray()) {
95-
int offset = buf.arrayOffset() + buf.readerIndex();
96-
int length = buf.readableBytes();
97-
return ByteBuffer.wrap(buf.array(), offset, length);
98-
} else {
99-
// fallback to deep copy
100-
byte[] bytes = new byte[buf.readableBytes()];
101-
buf.getBytes(buf.readerIndex(), bytes);
102-
return ByteBuffer.wrap(bytes);
103-
}
104-
}
105-
10679
public static ProduceLogRequest makeProduceLogRequest(
10780
long tableId, int acks, int maxRequestTimeoutMs, List<WriteBatch> batches) {
10881
ProduceLogRequest request =
@@ -205,58 +178,6 @@ public static PrefixLookupRequest makePrefixLookupRequest(
205178
return request;
206179
}
207180

208-
public static FetchLogResultForBucket getFetchLogResultForBucket(
209-
TableBucket tb, TablePath tp, PbFetchLogRespForBucket respForBucket) {
210-
FetchLogResultForBucket fetchLogResultForBucket;
211-
if (respForBucket.hasErrorCode()) {
212-
fetchLogResultForBucket =
213-
new FetchLogResultForBucket(tb, ApiError.fromErrorMessage(respForBucket));
214-
} else {
215-
if (respForBucket.hasRemoteLogFetchInfo()) {
216-
PbRemoteLogFetchInfo pbRlfInfo = respForBucket.getRemoteLogFetchInfo();
217-
String partitionName =
218-
pbRlfInfo.hasPartitionName() ? pbRlfInfo.getPartitionName() : null;
219-
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tp, partitionName);
220-
List<RemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
221-
for (PbRemoteLogSegment pbRemoteLogSegment : pbRlfInfo.getRemoteLogSegmentsList()) {
222-
RemoteLogSegment remoteLogSegment =
223-
RemoteLogSegment.Builder.builder()
224-
.tableBucket(tb)
225-
.physicalTablePath(physicalTablePath)
226-
.remoteLogSegmentId(
227-
UUID.fromString(
228-
pbRemoteLogSegment.getRemoteLogSegmentId()))
229-
.remoteLogEndOffset(pbRemoteLogSegment.getRemoteLogEndOffset())
230-
.remoteLogStartOffset(
231-
pbRemoteLogSegment.getRemoteLogStartOffset())
232-
.segmentSizeInBytes(pbRemoteLogSegment.getSegmentSizeInBytes())
233-
.maxTimestamp(-1L) // not use.
234-
.build();
235-
remoteLogSegmentList.add(remoteLogSegment);
236-
}
237-
RemoteLogFetchInfo rlFetchInfo =
238-
new RemoteLogFetchInfo(
239-
pbRlfInfo.getRemoteLogTabletDir(),
240-
pbRlfInfo.hasPartitionName() ? pbRlfInfo.getPartitionName() : null,
241-
remoteLogSegmentList,
242-
pbRlfInfo.getFirstStartPos());
243-
fetchLogResultForBucket =
244-
new FetchLogResultForBucket(
245-
tb, rlFetchInfo, respForBucket.getHighWatermark());
246-
} else {
247-
ByteBuffer recordsBuffer = toByteBuffer(respForBucket.getRecordsSlice());
248-
LogRecords records =
249-
respForBucket.hasRecords()
250-
? MemoryLogRecords.pointToByteBuffer(recordsBuffer)
251-
: MemoryLogRecords.EMPTY;
252-
fetchLogResultForBucket =
253-
new FetchLogResultForBucket(tb, records, respForBucket.getHighWatermark());
254-
}
255-
}
256-
257-
return fetchLogResultForBucket;
258-
}
259-
260181
public static KvSnapshots toKvSnapshots(GetLatestKvSnapshotsResponse response) {
261182
long tableId = response.getTableId();
262183
Long partitionId = response.hasPartitionId() ? response.getPartitionId() : null;

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@
4747
import java.util.List;
4848
import java.util.Map;
4949

50-
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.toByteBuffer;
5150
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
5251
import static com.alibaba.fluss.record.TestData.DATA2;
5352
import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE;
5453
import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID;
5554
import static com.alibaba.fluss.record.TestData.DATA2_TABLE_INFO;
5655
import static com.alibaba.fluss.record.TestData.DATA2_TABLE_PATH;
5756
import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID;
57+
import static com.alibaba.fluss.rpc.CommonRpcMessageUtils.toByteBuffer;
5858
import static com.alibaba.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
5959
import static org.assertj.core.api.Assertions.assertThat;
6060

fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
5555
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
5656
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
57-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getProduceLogData;
58-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeProduceLogResponse;
57+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
58+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
5959
import static com.alibaba.fluss.testutils.DataTestUtils.row;
6060
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
6161
import static org.assertj.core.api.Assertions.assertThat;

fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java

+13
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,19 @@ public static FsPath remoteOffsetIndexFile(
514514
indexSuffix);
515515
}
516516

517+
public static FsPath remoteWriterSnapshotFile(
518+
FsPath remoteLogDir, RemoteLogSegment remoteLogSegment, String indexSuffix) {
519+
return remoteLogIndexFile(
520+
remoteLogSegmentDir(
521+
remoteLogTabletDir(
522+
remoteLogDir,
523+
remoteLogSegment.physicalTablePath(),
524+
remoteLogSegment.tableBucket()),
525+
remoteLogSegment.remoteLogSegmentId()),
526+
remoteLogSegment.remoteLogEndOffset(),
527+
indexSuffix);
528+
}
529+
517530
/**
518531
* Returns the remote file path for storing the offset index file.
519532
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.rpc;
18+
19+
import com.alibaba.fluss.metadata.PhysicalTablePath;
20+
import com.alibaba.fluss.metadata.TableBucket;
21+
import com.alibaba.fluss.metadata.TablePath;
22+
import com.alibaba.fluss.record.LogRecords;
23+
import com.alibaba.fluss.record.MemoryLogRecords;
24+
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
25+
import com.alibaba.fluss.remote.RemoteLogSegment;
26+
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
27+
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket;
28+
import com.alibaba.fluss.rpc.messages.PbRemoteLogFetchInfo;
29+
import com.alibaba.fluss.rpc.messages.PbRemoteLogSegment;
30+
import com.alibaba.fluss.rpc.protocol.ApiError;
31+
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
32+
33+
import java.nio.ByteBuffer;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.UUID;
37+
38+
/**
39+
* Utils for making rpc request/response from inner object or convert inner class to rpc
40+
* request/response for common using.
41+
*/
42+
public class CommonRpcMessageUtils {
43+
public static FetchLogResultForBucket getFetchLogResultForBucket(
44+
TableBucket tb, TablePath tp, PbFetchLogRespForBucket respForBucket) {
45+
FetchLogResultForBucket fetchLogResultForBucket;
46+
if (respForBucket.hasErrorCode()) {
47+
fetchLogResultForBucket =
48+
new FetchLogResultForBucket(tb, ApiError.fromErrorMessage(respForBucket));
49+
} else {
50+
if (respForBucket.hasRemoteLogFetchInfo()) {
51+
PbRemoteLogFetchInfo pbRlfInfo = respForBucket.getRemoteLogFetchInfo();
52+
String partitionName =
53+
pbRlfInfo.hasPartitionName() ? pbRlfInfo.getPartitionName() : null;
54+
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tp, partitionName);
55+
List<RemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
56+
for (PbRemoteLogSegment pbRemoteLogSegment : pbRlfInfo.getRemoteLogSegmentsList()) {
57+
RemoteLogSegment remoteLogSegment =
58+
RemoteLogSegment.Builder.builder()
59+
.tableBucket(tb)
60+
.physicalTablePath(physicalTablePath)
61+
.remoteLogSegmentId(
62+
UUID.fromString(
63+
pbRemoteLogSegment.getRemoteLogSegmentId()))
64+
.remoteLogEndOffset(pbRemoteLogSegment.getRemoteLogEndOffset())
65+
.remoteLogStartOffset(
66+
pbRemoteLogSegment.getRemoteLogStartOffset())
67+
.segmentSizeInBytes(pbRemoteLogSegment.getSegmentSizeInBytes())
68+
.maxTimestamp(-1L) // not use.
69+
.build();
70+
remoteLogSegmentList.add(remoteLogSegment);
71+
}
72+
RemoteLogFetchInfo rlFetchInfo =
73+
new RemoteLogFetchInfo(
74+
pbRlfInfo.getRemoteLogTabletDir(),
75+
pbRlfInfo.hasPartitionName() ? pbRlfInfo.getPartitionName() : null,
76+
remoteLogSegmentList,
77+
pbRlfInfo.getFirstStartPos());
78+
fetchLogResultForBucket =
79+
new FetchLogResultForBucket(
80+
tb, rlFetchInfo, respForBucket.getHighWatermark());
81+
} else {
82+
ByteBuffer recordsBuffer = toByteBuffer(respForBucket.getRecordsSlice());
83+
LogRecords records =
84+
respForBucket.hasRecords()
85+
? MemoryLogRecords.pointToByteBuffer(recordsBuffer)
86+
: MemoryLogRecords.EMPTY;
87+
fetchLogResultForBucket =
88+
new FetchLogResultForBucket(tb, records, respForBucket.getHighWatermark());
89+
}
90+
}
91+
92+
return fetchLogResultForBucket;
93+
}
94+
95+
public static ByteBuffer toByteBuffer(ByteBuf buf) {
96+
if (buf.isDirect()) {
97+
return buf.nioBuffer();
98+
} else if (buf.hasArray()) {
99+
int offset = buf.arrayOffset() + buf.readerIndex();
100+
int length = buf.readableBytes();
101+
return ByteBuffer.wrap(buf.array(), offset, length);
102+
} else {
103+
// fallback to deep copy
104+
byte[] bytes = new byte[buf.readableBytes()];
105+
buf.getBytes(buf.readerIndex(), bytes);
106+
return ByteBuffer.wrap(bytes);
107+
}
108+
}
109+
}

fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
8181
import com.alibaba.fluss.server.metadata.TableMetadataInfo;
8282
import com.alibaba.fluss.server.tablet.TabletService;
83-
import com.alibaba.fluss.server.utils.RpcMessageUtils;
8483
import com.alibaba.fluss.server.zk.ZooKeeperClient;
8584
import com.alibaba.fluss.server.zk.data.BucketAssignment;
8685
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
@@ -103,9 +102,13 @@
103102
import java.util.Set;
104103
import java.util.concurrent.CompletableFuture;
105104

106-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeGetLatestKvSnapshotsResponse;
107-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeKvSnapshotMetadataResponse;
108-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTablePath;
105+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestKvSnapshotsResponse;
106+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestLakeSnapshotResponse;
107+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeKvSnapshotMetadataResponse;
108+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toGetFileSystemSecurityTokenResponse;
109+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toListPartitionInfosResponse;
110+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toPhysicalTablePath;
111+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
109112
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
110113
import static com.alibaba.fluss.utils.Preconditions.checkState;
111114

@@ -256,8 +259,7 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
256259

257260
for (PbPhysicalTablePath partitionPath : partitions) {
258261
partitionMetadataInfos.add(
259-
getPartitionMetadata(
260-
RpcMessageUtils.toPhysicalTablePath(partitionPath), listenerName));
262+
getPartitionMetadata(toPhysicalTablePath(partitionPath), listenerName));
261263
}
262264

263265
// get partition info from partition ids
@@ -379,7 +381,7 @@ public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecuri
379381
}
380382

381383
return CompletableFuture.completedFuture(
382-
RpcMessageUtils.toGetFileSystemSecurityTokenResponse(
384+
toGetFileSystemSecurityTokenResponse(
383385
remoteFileSystem.getUri().getScheme(), securityToken));
384386
} catch (Exception e) {
385387
throw new SecurityTokenException(
@@ -395,7 +397,7 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
395397
TableInfo tableInfo = metadataManager.getTable(tablePath);
396398
List<String> partitionKeys = tableInfo.getPartitionKeys();
397399
return CompletableFuture.completedFuture(
398-
RpcMessageUtils.toListPartitionInfosResponse(partitionKeys, partitionNameAndIds));
400+
toListPartitionInfosResponse(partitionKeys, partitionNameAndIds));
399401
}
400402

401403
@Override
@@ -427,7 +429,7 @@ public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
427429

428430
LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
429431
return CompletableFuture.completedFuture(
430-
RpcMessageUtils.makeGetLatestLakeSnapshotResponse(tableId, lakeTableSnapshot));
432+
makeGetLatestLakeSnapshotResponse(tableId, lakeTableSnapshot));
431433
}
432434

433435
private Set<ServerNode> getAllTabletServerNodes(String listenerName) {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import com.alibaba.fluss.server.metadata.ServerInfo;
7272
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
7373
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
74-
import com.alibaba.fluss.server.utils.RpcMessageUtils;
7574
import com.alibaba.fluss.server.zk.ZooKeeperClient;
7675
import com.alibaba.fluss.server.zk.data.BucketAssignment;
7776
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
@@ -107,6 +106,7 @@
107106
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
108107
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
109108
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
109+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeAdjustIsrResponse;
110110
import static com.alibaba.fluss.utils.concurrent.FutureUtils.completeFromCallable;
111111

112112
/** An implementation for {@link EventProcessor}. */
@@ -468,7 +468,7 @@ public void process(CoordinatorEvent event) {
468468
completeFromCallable(
469469
callback,
470470
() ->
471-
RpcMessageUtils.makeAdjustIsrResponse(
471+
makeAdjustIsrResponse(
472472
tryProcessAdjustIsr(
473473
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
474474
} else if (event instanceof CommitKvSnapshotEvent) {

0 commit comments

Comments
 (0)