Skip to content

Commit 20e87cc

Browse files
[server] Fix ISR state leak causing permanent deadlock during rolling upgrades (#2147)
1 parent 99ec308 commit 20e87cc

File tree

4 files changed

+70
-16
lines changed

4 files changed

+70
-16
lines changed

fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2424
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
2525
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
26-
import org.apache.fluss.rpc.protocol.Errors;
2726
import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
2827
import org.apache.fluss.server.zk.data.LeaderAndIsr;
2928
import org.apache.fluss.utils.MapUtils;
@@ -102,7 +101,15 @@ private void maybePropagateIsrAdjust() {
102101
// Copy current unsent ISRs but don't remove from the map, they get cleared in the
103102
// response callback.
104103
List<AdjustIsrItem> adjustIsrItemList = new ArrayList<>(unsentAdjustIsrMap.values());
105-
sendAdjustIsrRequest(adjustIsrItemList);
104+
try {
105+
sendAdjustIsrRequest(adjustIsrItemList);
106+
} catch (Throwable e) {
107+
// Caught any top level exceptions
108+
handleAdjustIsrRequestError(adjustIsrItemList, e);
109+
// If we received a top-level error from the coordinator, retry
110+
// the request in near future.
111+
scheduler.scheduleOnce("send-adjust-isr", this::maybePropagateIsrAdjust, 50);
112+
}
106113
}
107114
}
108115

@@ -123,30 +130,37 @@ protected void sendAdjustIsrRequest(List<AdjustIsrItem> adjustIsrItemList) {
123130
.adjustIsr(adjustIsrRequest)
124131
.whenComplete(
125132
(response, exception) -> {
126-
Errors errors;
127133
try {
128-
if (exception != null) {
129-
errors = Errors.forException(exception);
134+
if (null != exception) {
135+
handleAdjustIsrRequestError(adjustIsrItemList, exception);
136+
// If we received a top-level error from the coordinator, retry
137+
// the request in near future.
138+
scheduler.scheduleOnce(
139+
"send-adjust-isr", this::maybePropagateIsrAdjust, 50);
140+
return;
130141
} else {
131142
handleAdjustIsrResponse(response, adjustIsrItemList);
132-
errors = Errors.NONE;
133143
}
134144
} finally {
135145
// clear the flag so future requests can proceed.
136146
clearInFlightRequest();
137147
}
138-
139-
if (errors == Errors.NONE) {
140-
maybePropagateIsrAdjust();
141-
} else {
142-
// If we received a top-level error from the coordinator, retry
143-
// the request in near future.
144-
scheduler.scheduleOnce(
145-
"send-adjust-isr", this::maybePropagateIsrAdjust, 50);
146-
}
148+
maybePropagateIsrAdjust();
147149
});
148150
}
149151

152+
private void handleAdjustIsrRequestError(
153+
List<AdjustIsrItem> adjustIsrItemList, Throwable error) {
154+
LOG.error("Request adjust isr failed.", error);
155+
adjustIsrItemList.forEach(
156+
item -> {
157+
AdjustIsrItem adjustIsrItem = unsentAdjustIsrMap.remove(item.tableBucket);
158+
if (null != adjustIsrItem) {
159+
adjustIsrItem.future.completeExceptionally(error);
160+
}
161+
});
162+
}
163+
150164
private void handleAdjustIsrResponse(
151165
AdjustIsrResponse response, List<AdjustIsrItem> adjustIsrItemList) {
152166
Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ private CompletableFuture<LeaderAndIsr> submitAdjustIsr(
15771577
// We don't know what happened on the coordinator server
15781578
// exactly, but we do know this response is out of date,
15791579
// so we ignore it.
1580-
LOG.debug(
1580+
LOG.warn(
15811581
"Ignoring failed ISR update to {} for bucket {} since we have already updated state to {}",
15821582
proposedIsrState,
15831583
tableBucket,

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.exception.FencedLeaderEpochException;
2121
import org.apache.fluss.exception.IneligibleReplicaException;
22+
import org.apache.fluss.exception.NetworkException;
2223
import org.apache.fluss.metadata.TableBucket;
2324
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2425
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -91,6 +92,7 @@
9192
import org.apache.fluss.server.zk.ZooKeeperClient;
9293
import org.apache.fluss.server.zk.data.LeaderAndIsr;
9394
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
95+
import org.apache.fluss.utils.concurrent.FutureUtils;
9496

9597
import javax.annotation.Nullable;
9698

@@ -115,6 +117,7 @@ public class TestCoordinatorGateway implements CoordinatorGateway {
115117
public final AtomicBoolean commitRemoteLogManifestFail = new AtomicBoolean(false);
116118
public final Map<TableBucket, Integer> currentLeaderEpoch = new HashMap<>();
117119
private Set<Integer> shutdownTabletServers;
120+
private boolean networkIssueEnable = false;
118121

119122
public TestCoordinatorGateway() {
120123
this(null);
@@ -239,6 +242,10 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
239242

240243
@Override
241244
public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) {
245+
if (networkIssueEnable) {
246+
return FutureUtils.completedExceptionally(new NetworkException("Mock network issue."));
247+
}
248+
242249
Map<TableBucket, LeaderAndIsr> adjustIsrData = getAdjustIsrData(request);
243250
List<AdjustIsrResultForBucket> resultForBuckets = new ArrayList<>();
244251

@@ -372,4 +379,8 @@ public void setCurrentLeaderEpoch(TableBucket tableBucket, int leaderEpoch) {
372379
public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
373380
this.shutdownTabletServers = shutdownTabletServers;
374381
}
382+
383+
public void setNetworkIssueEnable(boolean networkIssueEnable) {
384+
this.networkIssueEnable = networkIssueEnable;
385+
}
375386
}

fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.replica;
1919

20+
import org.apache.fluss.exception.NetworkException;
2021
import org.apache.fluss.metadata.TableBucket;
2122
import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
2223
import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -28,6 +29,7 @@
2829
import java.util.List;
2930

3031
import static org.assertj.core.api.Assertions.assertThat;
32+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3133

3234
/** Test for {@link AdjustIsrManager}. */
3335
class AdjustIsrManagerTest {
@@ -56,4 +58,31 @@ void testSubmitShrinkIsr() throws Exception {
5658
assertThat(result)
5759
.isEqualTo(new LeaderAndIsr(tabletServerId, 0, Arrays.asList(1, 2, 3), 0, 2));
5860
}
61+
62+
@Test
63+
void testSubmitPropagatesRpcLevelErrorAndAllowsRetry() throws Exception {
64+
int tabletServerId = 0;
65+
TestCoordinatorGateway coordinatorGateway = new TestCoordinatorGateway();
66+
// Make all AdjustIsr requests fail with NetworkException.
67+
coordinatorGateway.setNetworkIssueEnable(true);
68+
AdjustIsrManager adjustIsrManager =
69+
new AdjustIsrManager(new FlussScheduler(1), coordinatorGateway, tabletServerId);
70+
71+
// The RPC-level exception is propagated to the submit future.
72+
TableBucket tb = new TableBucket(150001L, 0);
73+
List<Integer> currentIsr = Arrays.asList(1, 2);
74+
LeaderAndIsr adjustIsr = new LeaderAndIsr(tabletServerId, 0, currentIsr, 0, 0);
75+
assertThatThrownBy(() -> adjustIsrManager.submit(tb, adjustIsr).get())
76+
.rootCause()
77+
.isInstanceOf(NetworkException.class)
78+
.hasMessage("Mock network issue.");
79+
80+
coordinatorGateway.setNetworkIssueEnable(false);
81+
82+
// After the network issue is cleared, a retry should not be blocked by any previous
83+
// submit.
84+
LeaderAndIsr result = adjustIsrManager.submit(tb, adjustIsr).get();
85+
assertThat(result)
86+
.isEqualTo(new LeaderAndIsr(tabletServerId, 0, Arrays.asList(1, 2), 0, 1));
87+
}
5988
}

0 commit comments

Comments
 (0)