Skip to content

Commit 997b146

Browse files
[client] Fix exception occurs when initializing server connection. (#1740)
1 parent a60fe93 commit 997b146

File tree

4 files changed

+37
-14
lines changed

4 files changed

+37
-14
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,13 @@ private ServerConnection getOrCreateConnection(ServerNode node) {
192192
serverId,
193193
ignored -> {
194194
LOG.debug("Creating connection to server {}.", node);
195-
ServerConnection connection =
196-
new ServerConnection(
197-
bootstrap,
198-
node,
199-
clientMetricGroup,
200-
authenticatorSupplier.get(),
201-
isInnerClient);
202-
connection.whenClose(ignore -> connections.remove(serverId, connection));
203-
return connection;
195+
return new ServerConnection(
196+
bootstrap,
197+
node,
198+
clientMetricGroup,
199+
authenticatorSupplier.get(),
200+
(con, ignore) -> connections.remove(serverId, con),
201+
isInnerClient);
204202
});
205203
}
206204

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import java.util.Map;
5959
import java.util.concurrent.CompletableFuture;
6060
import java.util.concurrent.TimeUnit;
61-
import java.util.function.Consumer;
61+
import java.util.function.BiConsumer;
6262

6363
import static org.apache.fluss.utils.IOUtils.closeQuietly;
6464

@@ -104,15 +104,20 @@ final class ServerConnection {
104104
ServerNode node,
105105
ClientMetricGroup clientMetricGroup,
106106
ClientAuthenticator authenticator,
107+
BiConsumer<ServerConnection, Throwable> closeCallback,
107108
boolean isInnerClient) {
108109
this.node = node;
109110
this.state = ConnectionState.CONNECTING;
110111
this.connectionMetricGroup = clientMetricGroup.createConnectionMetricGroup(node.uid());
112+
this.authenticator = authenticator;
113+
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
114+
whenClose(closeCallback);
115+
116+
// connect and handle should be last in case of other variables are nullable and close
117+
// callback is not registered when connection established.
111118
bootstrap
112119
.connect(node.host(), node.port())
113120
.addListener(future -> establishConnection((ChannelFuture) future, isInnerClient));
114-
this.authenticator = authenticator;
115-
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
116121
}
117122

118123
public ServerNode getServerNode() {
@@ -131,8 +136,8 @@ public CompletableFuture<ApiMessage> send(ApiKeys apikey, ApiMessage request) {
131136
}
132137

133138
/** Register a callback to be called when the connection is closed. */
134-
public void whenClose(Consumer<Throwable> closeCallback) {
135-
closeFuture.whenComplete((v, throwable) -> closeCallback.accept(throwable));
139+
private void whenClose(BiConsumer<ServerConnection, Throwable> closeCallback) {
140+
closeFuture.whenComplete((v, throwable) -> closeCallback.accept(this, throwable));
136141
}
137142

138143
/** Close the connection. */

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,25 @@ void testMultipleEndpoint() throws Exception {
234234
}
235235
}
236236

237+
@Test
238+
void testExceptionWhenInitializeServerConnection() throws Exception {
239+
ApiVersionsRequest request =
240+
new ApiVersionsRequest()
241+
.setClientSoftwareName("testing_client_100")
242+
.setClientSoftwareVersion("1.0");
243+
// close the netty server.
244+
nettyServer.close();
245+
246+
// send request and create server connection.
247+
assertThatThrownBy(
248+
() ->
249+
nettyClient
250+
.sendRequest(serverNode, ApiKeys.API_VERSIONS, request)
251+
.get())
252+
.hasMessageContaining("Disconnected from node");
253+
assertThat(nettyClient.connections()).isEmpty();
254+
}
255+
237256
private void buildNettyServer(int serverId) throws Exception {
238257
try (NetUtils.Port availablePort = getAvailablePort()) {
239258
serverNode =

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ void testConnectionClose() {
9696
serverNode,
9797
TestingClientMetricGroup.newInstance(),
9898
clientAuthenticator,
99+
(con, ignore) -> {},
99100
false);
100101
ConnectionState connectionState = connection.getConnectionState();
101102
assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);

0 commit comments

Comments
 (0)