Skip to content

Commit 70cf206

Browse files
authored
[server] Change the NotLeaderOrFollowerException in NotifyLeaderAndIsrRequest from request level to bucket level (#638)
1 parent 157d252 commit 70cf206

File tree

8 files changed

+250
-79
lines changed

8 files changed

+250
-79
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
2626
import com.alibaba.fluss.rpc.messages.PbNotifyLakeTableOffsetReqForBucket;
2727
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket;
28-
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrRespForBucket;
2928
import com.alibaba.fluss.rpc.messages.PbStopReplicaReqForBucket;
3029
import com.alibaba.fluss.rpc.messages.PbStopReplicaRespForBucket;
3130
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
@@ -36,7 +35,6 @@
3635
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
3736
import com.alibaba.fluss.server.entity.DeleteReplicaResultForBucket;
3837
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
39-
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
4038
import com.alibaba.fluss.server.metadata.ServerInfo;
4139
import com.alibaba.fluss.server.utils.RpcMessageUtils;
4240
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
@@ -53,6 +51,10 @@
5351
import java.util.Set;
5452
import java.util.stream.Collectors;
5553

54+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getNotifyLeaderAndIsrResponseData;
55+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeNotifyBucketLeaderAndIsr;
56+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeNotifyLeaderAndIsrRequest;
57+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeUpdateMetadataRequest;
5658
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTableBucket;
5759

5860
/** A request sender for coordinator server to request to tablet server by batch. */
@@ -185,7 +187,7 @@ public void addNotifyLeaderRequestForTabletServers(
185187
notifyLeaderAndIsrRequestMap.computeIfAbsent(
186188
id, k -> new HashMap<>());
187189
PbNotifyLeaderAndIsrReqForBucket notifyLeaderAndIsrForBucket =
188-
RpcMessageUtils.makeNotifyBucketLeaderAndIsr(
190+
makeNotifyBucketLeaderAndIsr(
189191
new NotifyLeaderAndIsrData(
190192
tablePath,
191193
tableBucket,
@@ -229,7 +231,7 @@ public void addUpdateMetadataRequestForTabletServers(
229231
id ->
230232
updateMetadataRequestTabletServerSet.put(
231233
id,
232-
RpcMessageUtils.makeUpdateMetadataRequest(
234+
makeUpdateMetadataRequest(
233235
coordinatorServer, aliveTabletServers)));
234236
}
235237

@@ -286,54 +288,31 @@ private void sendNotifyLeaderAndIsrRequest(int coordinatorEpoch) {
286288
notifyRequestEntry : notifyLeaderAndIsrRequestMap.entrySet()) {
287289
// send request for each tablet server
288290
Integer serverId = notifyRequestEntry.getKey();
289-
Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket> notifyLeaders =
290-
notifyRequestEntry.getValue();
291291
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest =
292-
new NotifyLeaderAndIsrRequest()
293-
.setCoordinatorEpoch(coordinatorEpoch)
294-
.addAllNotifyBucketsLeaderReqs(notifyLeaders.values());
292+
makeNotifyLeaderAndIsrRequest(
293+
coordinatorEpoch, notifyRequestEntry.getValue().values());
295294

296295
coordinatorChannelManager.sendBucketLeaderAndIsrRequest(
297296
serverId,
298297
notifyLeaderAndIsrRequest,
299298
(response, throwable) -> {
300-
List<NotifyLeaderAndIsrResultForBucket> notifyLeaderAndIsrResultForBuckets =
301-
new ArrayList<>();
302299
if (throwable != null) {
303300
LOG.warn(
304301
"Failed to send notify leader and isr request to tablet server {}.",
305302
serverId,
306303
throwable);
307304
// todo: in FLUSS-55886145, we will introduce a sender thread to send
308-
// the request,
309-
// and retry if encounter any error; It may happens that the tablet
310-
// server
311-
// is offline and will always got error. But, coordinator will remove
312-
// the sender for the tablet server and mark all replica in the tablet
313-
// server as offline.
314-
// so, in here, if encounter any error, we just ignore it.
305+
// the request, and retry if encounter any error; It may happens that
306+
// the tablet server is offline and will always got error. But,
307+
// coordinator will remove the sender for the tablet server and mark all
308+
// replica in the tablet server as offline. so, in here, if encounter
309+
// any error, we just ignore it.
315310
return;
316311
}
317-
// handle the response
318-
for (PbNotifyLeaderAndIsrRespForBucket protoNotifyLeaderRespForBucket :
319-
response.getNotifyBucketsLeaderRespsList()) {
320-
TableBucket tableBucket =
321-
toTableBucket(protoNotifyLeaderRespForBucket.getTableBucket());
322-
// construct the result for notify bucket leader and isr
323-
NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket =
324-
protoNotifyLeaderRespForBucket.hasErrorCode()
325-
? new NotifyLeaderAndIsrResultForBucket(
326-
tableBucket,
327-
ApiError.fromErrorMessage(
328-
protoNotifyLeaderRespForBucket))
329-
: new NotifyLeaderAndIsrResultForBucket(tableBucket);
330-
notifyLeaderAndIsrResultForBuckets.add(
331-
notifyLeaderAndIsrResultForBucket);
332-
}
333312
// put the response receive event into the event manager
334313
eventManager.put(
335314
new NotifyLeaderAndIsrResponseReceivedEvent(
336-
notifyLeaderAndIsrResultForBuckets, serverId));
315+
getNotifyLeaderAndIsrResponseData(response), serverId));
337316
});
338317
}
339318
notifyLeaderAndIsrRequestMap.clear();

fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ public void becomeLeaderOrFollower(
326326
int requestCoordinatorEpoch,
327327
List<NotifyLeaderAndIsrData> notifyLeaderAndIsrDataList,
328328
Consumer<List<NotifyLeaderAndIsrResultForBucket>> responseCallback) {
329-
List<NotifyLeaderAndIsrResultForBucket> result = new ArrayList<>();
329+
Map<TableBucket, NotifyLeaderAndIsrResultForBucket> result = new HashMap<>();
330330
inLock(
331331
replicaStateChangeLock,
332332
() -> {
@@ -345,7 +345,8 @@ public void becomeLeaderOrFollower(
345345
replicasToBeFollower.add(data);
346346
}
347347
} catch (Exception e) {
348-
result.add(
348+
result.put(
349+
tb,
349350
new NotifyLeaderAndIsrResultForBucket(
350351
tb, ApiError.fromThrowable(e)));
351352
}
@@ -361,7 +362,7 @@ public void becomeLeaderOrFollower(
361362
replicaFetcherManager.shutdownIdleFetcherThreads();
362363
});
363364

364-
responseCallback.accept(result);
365+
responseCallback.accept(new ArrayList<>(result.values()));
365366
}
366367

367368
/**
@@ -701,7 +702,7 @@ public void notifyLakeTableOffset(
701702
*/
702703
private void makeLeaders(
703704
List<NotifyLeaderAndIsrData> replicasToBeLeader,
704-
List<NotifyLeaderAndIsrResultForBucket> result) {
705+
Map<TableBucket, NotifyLeaderAndIsrResultForBucket> result) {
705706
if (replicasToBeLeader.isEmpty()) {
706707
return;
707708
}
@@ -720,10 +721,11 @@ private void makeLeaders(
720721
}
721722
// start the remote log tiering tasks for leaders
722723
remoteLogManager.startLogTiering(replica);
723-
result.add(new NotifyLeaderAndIsrResultForBucket(tb));
724+
result.put(tb, new NotifyLeaderAndIsrResultForBucket(tb));
724725
} catch (Exception e) {
725726
LOG.error("Error make replica {} to leader", tb, e);
726-
result.add(new NotifyLeaderAndIsrResultForBucket(tb, ApiError.fromThrowable(e)));
727+
result.put(
728+
tb, new NotifyLeaderAndIsrResultForBucket(tb, ApiError.fromThrowable(e)));
727729
}
728730
}
729731
}
@@ -760,7 +762,7 @@ private void updateWithLakeTableSnapshot(Replica replica) throws Exception {
760762
*/
761763
private void makeFollowers(
762764
List<NotifyLeaderAndIsrData> replicasToBeFollower,
763-
List<NotifyLeaderAndIsrResultForBucket> result) {
765+
Map<TableBucket, NotifyLeaderAndIsrResultForBucket> result) {
764766
if (replicasToBeFollower.isEmpty()) {
765767
return;
766768
}
@@ -774,11 +776,12 @@ private void makeFollowers(
774776
}
775777
// stop the remote log tiering tasks for followers
776778
remoteLogManager.stopLogTiering(replica);
777-
result.add(new NotifyLeaderAndIsrResultForBucket(tb));
779+
result.put(tb, new NotifyLeaderAndIsrResultForBucket(tb));
778780
replicasBecomeFollower.add(replica);
779781
} catch (Exception e) {
780782
LOG.error("Error make replica {} to follower", tb, e);
781-
result.add(new NotifyLeaderAndIsrResultForBucket(tb, ApiError.fromThrowable(e)));
783+
result.put(
784+
tb, new NotifyLeaderAndIsrResultForBucket(tb, ApiError.fromThrowable(e)));
782785
}
783786
}
784787

@@ -802,36 +805,55 @@ private void makeFollowers(
802805
truncateToHighWatermark(replicasBecomeFollower);
803806

804807
// add fetcher for those follower replicas.
805-
addFetcherForReplicas(replicasBecomeFollower);
808+
addFetcherForReplicas(replicasBecomeFollower, result);
806809
}
807810

808-
private void addFetcherForReplicas(List<Replica> replicas) {
811+
private void addFetcherForReplicas(
812+
List<Replica> replicas, Map<TableBucket, NotifyLeaderAndIsrResultForBucket> result) {
809813
Map<TableBucket, InitialFetchStatus> bucketAndStatus = new HashMap<>();
810814
for (Replica replica : replicas) {
811815
Integer leaderId = replica.getLeaderId();
816+
TableBucket tb = replica.getTableBucket();
817+
LogTablet logTablet = replica.getLogTablet();
812818
if (leaderId == null) {
813-
throw new NotLeaderOrFollowerException(
814-
String.format(
815-
"Could not find leader for follower replica %s while make leader for table bucket %s",
816-
serverId, replica.getTableBucket()));
817-
}
818-
819-
// fetch from leader server node with internal endpoint.
820-
Optional<ServerNode> leader =
821-
metadataCache.getTabletServer(leaderId, internalListenerName);
822-
if (!leader.isPresent()) {
823-
throw new NotLeaderOrFollowerException(
824-
String.format(
825-
"Could not find leader in server metadata by id for replica %s while make follower",
826-
replica));
819+
result.put(
820+
tb,
821+
new NotifyLeaderAndIsrResultForBucket(
822+
tb,
823+
ApiError.fromThrowable(
824+
new NotLeaderOrFollowerException(
825+
String.format(
826+
"Could not find leader for follower replica %s while make "
827+
+ "leader for table bucket %s",
828+
serverId, tb)))));
829+
} else {
830+
// fetch from leader server node with internal endpoint.
831+
Optional<ServerNode> leader =
832+
metadataCache.getTabletServer(leaderId, internalListenerName);
833+
if (!leader.isPresent()) {
834+
// If leader serverNode is not in the metadata, we need to return a bucket level
835+
// error to let CoordinatorServer retry sending makeLeaderOrFollower request.
836+
// This situation will be happened if the leader serverNode is offline and
837+
// didn't recovery now.
838+
result.put(
839+
tb,
840+
new NotifyLeaderAndIsrResultForBucket(
841+
tb,
842+
ApiError.fromThrowable(
843+
new NotLeaderOrFollowerException(
844+
String.format(
845+
"Could not find leader in server metadata by id "
846+
+ "for replica %s while make follower",
847+
replica)))));
848+
} else {
849+
// For these replicas whose leader id has been set and the server id is in the
850+
// metadata. We need to add fetcher for these replicas.
851+
bucketAndStatus.put(
852+
tb,
853+
new InitialFetchStatus(
854+
tb.getTableId(), leader.get(), logTablet.localLogEndOffset()));
855+
}
827856
}
828-
829-
LogTablet logTablet = replica.getLogTablet();
830-
TableBucket tableBucket = logTablet.getTableBucket();
831-
bucketAndStatus.put(
832-
tableBucket,
833-
new InitialFetchStatus(
834-
tableBucket.getTableId(), leader.get(), logTablet.localLogEndOffset()));
835857
}
836858
replicaFetcherManager.addFetcherForBuckets(bucketAndStatus);
837859
}

fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,23 @@ private static class ServerAndFetcherId {
233233
this.serverNode = serverNode;
234234
this.fetcherId = fetcherId;
235235
}
236+
237+
@Override
238+
public boolean equals(Object o) {
239+
if (this == o) {
240+
return true;
241+
}
242+
if (o == null || getClass() != o.getClass()) {
243+
return false;
244+
}
245+
ServerAndFetcherId that = (ServerAndFetcherId) o;
246+
return fetcherId == that.fetcherId && serverNode.equals(that.serverNode);
247+
}
248+
249+
@Override
250+
public int hashCode() {
251+
return Objects.hash(serverNode, fetcherId);
252+
}
236253
}
237254

238255
/** Class to represent server id and fetcher id. */

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@
6565

6666
import static com.alibaba.fluss.server.log.FetchParams.DEFAULT_MAX_WAIT_MS_WHEN_MIN_BYTES_ENABLE;
6767
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getFetchLogData;
68+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getNotifyLeaderAndIsrRequestData;
6869
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeLookupResponse;
70+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeNotifyLeaderAndIsrResponse;
6971
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makePrefixLookupResponse;
7072
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toLookupData;
7173
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toPrefixLookupData;
@@ -186,9 +188,8 @@ public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
186188
CompletableFuture<NotifyLeaderAndIsrResponse> response = new CompletableFuture<>();
187189
replicaManager.becomeLeaderOrFollower(
188190
notifyLeaderAndIsrRequest.getCoordinatorEpoch(),
189-
RpcMessageUtils.getNotifyLeaderAndIsrData(notifyLeaderAndIsrRequest),
190-
result ->
191-
response.complete(RpcMessageUtils.makeNotifyLeaderAndIsrResponse(result)));
191+
getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest),
192+
result -> response.complete(makeNotifyLeaderAndIsrResponse(result)));
192193
return response;
193194
}
194195

fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137

138138
import java.nio.ByteBuffer;
139139
import java.util.ArrayList;
140+
import java.util.Collection;
140141
import java.util.HashMap;
141142
import java.util.HashSet;
142143
import java.util.List;
@@ -234,6 +235,13 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest(
234235
return updateMetadataRequest;
235236
}
236237

238+
public static NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest(
239+
int coordinatorEpoch, Collection<PbNotifyLeaderAndIsrReqForBucket> notifyLeaders) {
240+
return new NotifyLeaderAndIsrRequest()
241+
.setCoordinatorEpoch(coordinatorEpoch)
242+
.addAllNotifyBucketsLeaderReqs(notifyLeaders);
243+
}
244+
237245
public static PbNotifyLeaderAndIsrReqForBucket makeNotifyBucketLeaderAndIsr(
238246
NotifyLeaderAndIsrData notifyLeaderAndIsrData) {
239247
PbNotifyLeaderAndIsrReqForBucket reqForBucket =
@@ -261,7 +269,7 @@ public static PbNotifyLeaderAndIsrReqForBucket makeNotifyBucketLeaderAndIsr(
261269
return reqForBucket;
262270
}
263271

264-
public static List<NotifyLeaderAndIsrData> getNotifyLeaderAndIsrData(
272+
public static List<NotifyLeaderAndIsrData> getNotifyLeaderAndIsrRequestData(
265273
NotifyLeaderAndIsrRequest request) {
266274
List<NotifyLeaderAndIsrData> notifyLeaderAndIsrDataList = new ArrayList<>();
267275
for (PbNotifyLeaderAndIsrReqForBucket reqForBucket :
@@ -317,6 +325,26 @@ public static NotifyLeaderAndIsrResponse makeNotifyLeaderAndIsrResponse(
317325
return notifyLeaderAndIsrResponse;
318326
}
319327

328+
public static List<NotifyLeaderAndIsrResultForBucket> getNotifyLeaderAndIsrResponseData(
329+
NotifyLeaderAndIsrResponse response) {
330+
List<NotifyLeaderAndIsrResultForBucket> notifyLeaderAndIsrResultForBuckets =
331+
new ArrayList<>();
332+
for (PbNotifyLeaderAndIsrRespForBucket protoNotifyLeaderRespForBucket :
333+
response.getNotifyBucketsLeaderRespsList()) {
334+
TableBucket tableBucket =
335+
toTableBucket(protoNotifyLeaderRespForBucket.getTableBucket());
336+
// construct the result for notify bucket leader and isr
337+
NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket =
338+
protoNotifyLeaderRespForBucket.hasErrorCode()
339+
? new NotifyLeaderAndIsrResultForBucket(
340+
tableBucket,
341+
ApiError.fromErrorMessage(protoNotifyLeaderRespForBucket))
342+
: new NotifyLeaderAndIsrResultForBucket(tableBucket);
343+
notifyLeaderAndIsrResultForBuckets.add(notifyLeaderAndIsrResultForBucket);
344+
}
345+
return notifyLeaderAndIsrResultForBuckets;
346+
}
347+
320348
public static PbStopReplicaReqForBucket makeStopBucketReplica(
321349
TableBucket tableBucket, boolean isDelete, int leaderEpoch) {
322350
PbStopReplicaReqForBucket stopBucketReplicaRequest = new PbStopReplicaReqForBucket();

0 commit comments

Comments
 (0)