Skip to content

Commit 640f40c

Browse files
authored
[server] Coordinator should reject the AdjustIsr request if the adjusted isr contains shutdown TabletServers (#1734)
1 parent 47e3248 commit 640f40c

File tree

7 files changed

+115
-3
lines changed

7 files changed

+115
-3
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/** Exception for ineligible replica. */
21+
public class IneligibleReplicaException extends ApiException {
22+
23+
private static final long serialVersionUID = 1L;
24+
25+
public IneligibleReplicaException(String message) {
26+
super(message);
27+
}
28+
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.exception.DuplicateSequenceException;
2929
import org.apache.fluss.exception.FencedLeaderEpochException;
3030
import org.apache.fluss.exception.FencedTieringEpochException;
31+
import org.apache.fluss.exception.IneligibleReplicaException;
3132
import org.apache.fluss.exception.InvalidColumnProjectionException;
3233
import org.apache.fluss.exception.InvalidConfigException;
3334
import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -217,7 +218,11 @@ public enum Errors {
217218
LAKE_SNAPSHOT_NOT_EXIST(
218219
53, "The lake snapshot is not exist.", LakeTableSnapshotNotExistException::new),
219220
LAKE_TABLE_ALREADY_EXIST(
220-
54, "The lake table already exists.", LakeTableAlreadyExistException::new);
221+
54, "The lake table already exists.", LakeTableAlreadyExistException::new),
222+
INELIGIBLE_REPLICA_EXCEPTION(
223+
55,
224+
"The new ISR contains at least one ineligible replica.",
225+
IneligibleReplicaException::new);
221226

222227
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
223228

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.config.Configuration;
2626
import org.apache.fluss.exception.FencedLeaderEpochException;
2727
import org.apache.fluss.exception.FlussRuntimeException;
28+
import org.apache.fluss.exception.IneligibleReplicaException;
2829
import org.apache.fluss.exception.InvalidCoordinatorException;
2930
import org.apache.fluss.exception.InvalidUpdateVersionException;
3031
import org.apache.fluss.exception.TabletServerNotAvailableException;
@@ -1026,6 +1027,20 @@ private void validateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr newLeade
10261027
// that this node is not the leader.
10271028
throw new InvalidUpdateVersionException(
10281029
"The request bucket epoch in adjust isr request is lower than current bucket epoch in coordinator.");
1030+
} else {
1031+
// Check if the new ISR are all ineligible replicas (doesn't contain any shutting
1032+
// down tabletServers).
1033+
Set<Integer> ineligibleReplicas = new HashSet<>(newLeaderAndIsr.isr());
1034+
ineligibleReplicas.removeAll(coordinatorContext.liveTabletServerSet());
1035+
if (!ineligibleReplicas.isEmpty()) {
1036+
String errorMsg =
1037+
String.format(
1038+
"Rejecting adjustIsr request for table bucket %s because it "
1039+
+ "specified ineligible replicas %s in the new ISR %s",
1040+
tableBucket, ineligibleReplicas, newLeaderAndIsr.isr());
1041+
LOG.info(errorMsg);
1042+
throw new IneligibleReplicaException(errorMsg);
1043+
}
10291044
}
10301045
}
10311046
}

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ private boolean handleAdjustIsrError(IsrState.PendingIsrState proposedIsrState,
16491649
failedIsrUpdates.inc();
16501650
switch (error) {
16511651
case OPERATION_NOT_ATTEMPTED_EXCEPTION:
1652+
case INELIGIBLE_REPLICA_EXCEPTION:
16521653
// Care must be taken when resetting to the last committed state since we may not
16531654
// know in general whether the request was applied or not taking into account
16541655
// retries and controller changes which might have occurred before we received the

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ public void batchRegisterLeaderAndIsrForTablePartition(
408408
.forPath(bucketsParentPath);
409409

410410
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
411+
LOG.info(
412+
"Batch Register {} for bucket {} in Zookeeper.",
413+
info.getLeaderAndIsr(),
414+
info.getTableBucket());
411415
byte[] data = LeaderAndIsrZNode.encode(info.getLeaderAndIsr());
412416
// create direct parent node
413417
CuratorOp parentNodeCreate =
@@ -487,6 +491,7 @@ public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrL
487491
TableBucket tableBucket = entry.getKey();
488492
LeaderAndIsr leaderAndIsr = entry.getValue();
489493

494+
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
490495
String path = LeaderAndIsrZNode.path(tableBucket);
491496
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
492497
CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.coordinator;
1919

2020
import org.apache.fluss.exception.FencedLeaderEpochException;
21+
import org.apache.fluss.exception.IneligibleReplicaException;
2122
import org.apache.fluss.metadata.TableBucket;
2223
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2324
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -89,8 +90,10 @@
8990

9091
import java.util.ArrayList;
9192
import java.util.HashMap;
93+
import java.util.HashSet;
9294
import java.util.List;
9395
import java.util.Map;
96+
import java.util.Set;
9497
import java.util.concurrent.CompletableFuture;
9598
import java.util.concurrent.atomic.AtomicBoolean;
9699

@@ -105,13 +108,15 @@ public class TestCoordinatorGateway implements CoordinatorGateway {
105108
private final @Nullable ZooKeeperClient zkClient;
106109
public final AtomicBoolean commitRemoteLogManifestFail = new AtomicBoolean(false);
107110
public final Map<TableBucket, Integer> currentLeaderEpoch = new HashMap<>();
111+
private Set<Integer> shutdownTabletServers;
108112

109113
public TestCoordinatorGateway() {
110114
this(null);
111115
}
112116

113117
public TestCoordinatorGateway(ZooKeeperClient zkClient) {
114118
this.zkClient = zkClient;
119+
this.shutdownTabletServers = new HashSet<>();
115120
}
116121

117122
@Override
@@ -230,7 +235,12 @@ public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request)
230235
(tb, leaderAndIsr) -> {
231236
Integer currentLeaderEpoch = this.currentLeaderEpoch.getOrDefault(tb, 0);
232237
int requestLeaderEpoch = leaderAndIsr.leaderEpoch();
233-
238+
Set<Integer> ineligibleReplicas = new HashSet<>();
239+
for (int replica : leaderAndIsr.isr()) {
240+
if (shutdownTabletServers.contains(replica)) {
241+
ineligibleReplicas.add(replica);
242+
}
243+
}
234244
AdjustIsrResultForBucket adjustIsrResultForBucket;
235245
if (requestLeaderEpoch < currentLeaderEpoch) {
236246
adjustIsrResultForBucket =
@@ -239,6 +249,19 @@ public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request)
239249
ApiError.fromThrowable(
240250
new FencedLeaderEpochException(
241251
"request leader epoch is fenced.")));
252+
} else if (!ineligibleReplicas.isEmpty()) {
253+
adjustIsrResultForBucket =
254+
new AdjustIsrResultForBucket(
255+
tb,
256+
ApiError.fromThrowable(
257+
new IneligibleReplicaException(
258+
String.format(
259+
"Rejecting adjustIsr request for table bucket %s because it "
260+
+ "specified ineligible replicas %s in the new ISR %s",
261+
tb,
262+
ineligibleReplicas,
263+
leaderAndIsr))));
264+
242265
} else {
243266
adjustIsrResultForBucket =
244267
new AdjustIsrResultForBucket(
@@ -322,4 +345,8 @@ public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request) {
322345
public void setCurrentLeaderEpoch(TableBucket tableBucket, int leaderEpoch) {
323346
currentLeaderEpoch.put(tableBucket, leaderEpoch);
324347
}
348+
349+
public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
350+
this.shutdownTabletServers = shutdownTabletServers;
351+
}
325352
}

fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.exception.FencedLeaderEpochException;
23+
import org.apache.fluss.exception.IneligibleReplicaException;
2324
import org.apache.fluss.metadata.TableBucket;
2425
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
2526
import org.apache.fluss.server.entity.FetchReqInfo;
@@ -118,7 +119,7 @@ void testShrinkIsr() {
118119
}
119120

120121
@Test
121-
void testSubmitShrinkIsrAsLeaderFenced() throws Exception {
122+
void testSubmitShrinkIsrAsLeaderFenced() {
122123
// replica set is 1,2,3 , isr set is 1,2,3.
123124
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
124125
makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 3), false);
@@ -144,4 +145,34 @@ void testSubmitShrinkIsrAsLeaderFenced() throws Exception {
144145
.isInstanceOf(FencedLeaderEpochException.class)
145146
.hasMessageContaining("request leader epoch is fenced.");
146147
}
148+
149+
@Test
150+
void testSubmitShrinkIsrAsServerAlreadyShutdown() {
151+
// replica set is 1,2,3 , isr set is 1,2,3.
152+
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
153+
makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 3), false);
154+
155+
Replica replica = replicaManager.getReplicaOrException(tb);
156+
assertThat(replica.getIsr()).containsExactlyInAnyOrder(1, 2, 3);
157+
158+
// To mock we prepare an isr shrink in Replica#maybeShrinkIsr();
159+
IsrState.PendingShrinkIsrState pendingShrinkIsrState =
160+
replica.prepareIsrShrink(
161+
new IsrState.CommittedIsrState(Arrays.asList(1, 2, 3)),
162+
Arrays.asList(1, 2),
163+
Collections.singletonList(3));
164+
165+
// Set tabletServer-2 as shutdown tabletServers to mock server already shutdown.
166+
testCoordinatorGateway.setShutdownTabletServers(Collections.singleton(2));
167+
assertThatThrownBy(
168+
() ->
169+
replica.submitAdjustIsr(pendingShrinkIsrState)
170+
.get(1, TimeUnit.MINUTES))
171+
.rootCause()
172+
.isInstanceOf(IneligibleReplicaException.class)
173+
.hasMessage(
174+
"Rejecting adjustIsr request for table bucket "
175+
+ "TableBucket{tableId=150001, bucket=1} because it specified ineligible replicas [2] "
176+
+ "in the new ISR LeaderAndIsr{leader=1, leaderEpoch=0, isr=[1, 2], coordinatorEpoch=0, bucketEpoch=0}");
177+
}
147178
}

0 commit comments

Comments
 (0)