Skip to content

Commit 67eefc4

Browse files
authored
[common] Fix serverConnection that has already been set to disconnected is still processing requests error (#1722)
1 parent e1ff522 commit 67eefc4

File tree

3 files changed

+164
-36
lines changed

3 files changed

+164
-36
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.rpc.netty.client;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.cluster.ServerNode;
2122
import org.apache.fluss.exception.DisconnectException;
2223
import org.apache.fluss.exception.FlussRuntimeException;
@@ -170,39 +171,18 @@ private CompletableFuture<Void> close(Throwable cause) {
170171
}
171172

172173
if (channel != null) {
173-
channel.close()
174-
.addListener(
175-
(ChannelFutureListener)
176-
future -> {
177-
178-
// when finishing, if netty successfully closes the
179-
// channel, then the provided exception is used as
180-
// the reason for the closing. If there was something
181-
// wrong at the netty side, then that exception is
182-
// prioritized over the provided one.
183-
if (future.isSuccess()) {
184-
if (cause instanceof ClosedChannelException) {
185-
// the ClosedChannelException is expected
186-
closeFuture.complete(null);
187-
} else {
188-
closeFuture.completeExceptionally(cause);
189-
}
190-
} else {
191-
LOG.warn(
192-
"Something went wrong when trying to close connection due to : ",
193-
cause);
194-
closeFuture.completeExceptionally(future.cause());
195-
}
196-
});
174+
// Close the channel directly, without waiting for the channel to close properly.
175+
channel.close();
176+
}
177+
178+
// TODO all return completeExceptionally will let some test cases blocked, so we
179+
// need to find why the test cases are blocked and remove the if statement.
180+
if (cause instanceof ClosedChannelException
181+
|| cause.getCause() instanceof ConnectException) {
182+
// the ClosedChannelException and ConnectException is expected.
183+
closeFuture.complete(null);
197184
} else {
198-
// TODO all return completeExceptionally will let some test cases blocked, so we
199-
// need to find why the test cases are blocked and remove the if statement.
200-
if (cause.getCause() instanceof ConnectException) {
201-
// the ConnectException is expected
202-
closeFuture.complete(null);
203-
} else {
204-
closeFuture.completeExceptionally(cause);
205-
}
185+
closeFuture.completeExceptionally(cause);
206186
}
207187

208188
connectionMetricGroup.close();
@@ -491,7 +471,8 @@ private void switchState(ConnectionState targetState) {
491471
* <li>READY: connection is ready to send requests.
492472
* <li>DISCONNECTED: connection is failed to establish.
493473
*/
494-
private enum ConnectionState {
474+
@VisibleForTesting
475+
enum ConnectionState {
495476
CONNECTING,
496477
CHECKING_API_VERSIONS,
497478
AUTHENTICATING,
@@ -565,4 +546,9 @@ public String ipAddress() {
565546
return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
566547
}
567548
}
549+
550+
@VisibleForTesting
551+
ConnectionState getConnectionState() {
552+
return state;
553+
}
568554
}

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.jupiter.api.BeforeEach;
4343
import org.junit.jupiter.api.Test;
4444

45+
import java.net.ConnectException;
4546
import java.util.ArrayList;
4647
import java.util.Arrays;
4748
import java.util.Collections;
@@ -156,9 +157,9 @@ void testServerDisconnection() throws Exception {
156157
nettyClient
157158
.sendRequest(serverNode, ApiKeys.API_VERSIONS, request)
158159
.get())
159-
.isInstanceOf(ExecutionException.class)
160-
.hasMessageContaining("Disconnected from node")
161-
.hasRootCauseMessage("finishConnect(..) failed: Connection refused");
160+
.rootCause()
161+
.isInstanceOf(ConnectException.class)
162+
.hasMessageContaining("Connection refused");
162163
assertThat(nettyClient.connections().size()).isEqualTo(0);
163164

164165
// restart the netty server.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.rpc.netty.client;
19+
20+
import org.apache.fluss.cluster.Endpoint;
21+
import org.apache.fluss.cluster.ServerNode;
22+
import org.apache.fluss.cluster.ServerType;
23+
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.exception.DisconnectException;
25+
import org.apache.fluss.metrics.groups.MetricGroup;
26+
import org.apache.fluss.metrics.util.NOPMetricsGroup;
27+
import org.apache.fluss.rpc.TestingGatewayService;
28+
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
29+
import org.apache.fluss.rpc.messages.PbTablePath;
30+
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
31+
import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
32+
import org.apache.fluss.rpc.netty.server.NettyServer;
33+
import org.apache.fluss.rpc.netty.server.RequestsMetrics;
34+
import org.apache.fluss.rpc.protocol.ApiKeys;
35+
import org.apache.fluss.security.auth.AuthenticationFactory;
36+
import org.apache.fluss.security.auth.ClientAuthenticator;
37+
import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
38+
import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
39+
import org.apache.fluss.utils.NetUtils;
40+
41+
import org.junit.jupiter.api.AfterEach;
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Test;
44+
45+
import java.util.Collections;
46+
import java.util.concurrent.CompletableFuture;
47+
48+
import static org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
49+
import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
50+
import static org.apache.fluss.utils.NetUtils.getAvailablePort;
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
53+
54+
/** Test for {@link ServerConnection}. */
55+
public class ServerConnectionTest {
56+
57+
private EventLoopGroup eventLoopGroup;
58+
private Bootstrap bootstrap;
59+
private ClientAuthenticator clientAuthenticator;
60+
private Configuration conf;
61+
private NettyServer nettyServer;
62+
private ServerNode serverNode;
63+
private TestingGatewayService service;
64+
65+
@BeforeEach
66+
void setUp() throws Exception {
67+
conf = new Configuration();
68+
buildNettyServer(0);
69+
70+
eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test");
71+
bootstrap =
72+
new Bootstrap()
73+
.group(eventLoopGroup)
74+
.channel(getClientSocketChannelClass(eventLoopGroup))
75+
.handler(new ClientChannelInitializer(5000));
76+
clientAuthenticator =
77+
AuthenticationFactory.loadClientAuthenticatorSupplier(new Configuration()).get();
78+
}
79+
80+
@AfterEach
81+
void tearDown() throws Exception {
82+
if (nettyServer != null) {
83+
nettyServer.close();
84+
}
85+
86+
if (eventLoopGroup != null) {
87+
eventLoopGroup.shutdownGracefully();
88+
}
89+
}
90+
91+
@Test
92+
void testConnectionClose() {
93+
ServerConnection connection =
94+
new ServerConnection(
95+
bootstrap,
96+
serverNode,
97+
TestingClientMetricGroup.newInstance(),
98+
clientAuthenticator,
99+
false);
100+
ConnectionState connectionState = connection.getConnectionState();
101+
assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);
102+
103+
GetTableSchemaRequest request =
104+
new GetTableSchemaRequest()
105+
.setTablePath(
106+
new PbTablePath().setDatabaseName("test").setTableName("test"))
107+
.setSchemaId(0);
108+
connection.send(ApiKeys.GET_TABLE_SCHEMA, request);
109+
110+
CompletableFuture<Void> future = connection.close();
111+
connectionState = connection.getConnectionState();
112+
assertThat(connectionState).isEqualTo(ConnectionState.DISCONNECTED);
113+
assertThat(future.isDone()).isTrue();
114+
115+
assertThatThrownBy(() -> connection.send(ApiKeys.GET_TABLE_SCHEMA, request).get())
116+
.rootCause()
117+
.isInstanceOf(DisconnectException.class)
118+
.hasMessageContaining("Cannot send request to server");
119+
future = connection.close();
120+
assertThat(future.isDone()).isTrue();
121+
}
122+
123+
private void buildNettyServer(int serverId) throws Exception {
124+
try (NetUtils.Port availablePort = getAvailablePort()) {
125+
serverNode =
126+
new ServerNode(
127+
serverId, "localhost", availablePort.getPort(), ServerType.COORDINATOR);
128+
service = new TestingGatewayService();
129+
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
130+
nettyServer =
131+
new NettyServer(
132+
conf,
133+
Collections.singleton(
134+
new Endpoint(serverNode.host(), serverNode.port(), "INTERNAL")),
135+
service,
136+
metricGroup,
137+
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
138+
nettyServer.start();
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)