4545import java .util .List ;
4646import java .util .Map ;
4747import java .util .concurrent .CompletableFuture ;
48+ import java .util .concurrent .ExecutionException ;
4849import java .util .concurrent .TimeUnit ;
4950import java .util .function .Supplier ;
5051
@@ -71,6 +72,8 @@ public final class NettyClient implements RpcClient {
7172 */
7273 private final Map <String , ServerConnection > connections ;
7374
75+ private final Map <String , CompletableFuture <ServerConnection >> connectionFutures ;
76+
7477 /** Metric groups for client. */
7578 private final ClientMetricGroup clientMetricGroup ;
7679
@@ -87,6 +90,7 @@ public final class NettyClient implements RpcClient {
8790 public NettyClient (
8891 Configuration conf , ClientMetricGroup clientMetricGroup , boolean isInnerClient ) {
8992 this .connections = MapUtils .newConcurrentHashMap ();
93+ this .connectionFutures = MapUtils .newConcurrentHashMap ();
9094
9195 // build bootstrap
9296 this .eventGroup =
@@ -188,20 +192,51 @@ public void close() throws Exception {
188192
189193 private ServerConnection getOrCreateConnection (ServerNode node ) {
190194 String serverId = node .uid ();
191- return connections .computeIfAbsent (
192- serverId ,
193- ignored -> {
194- LOG .debug ("Creating connection to server {}." , node );
195- ServerConnection connection =
196- new ServerConnection (
197- bootstrap ,
198- node ,
199- clientMetricGroup ,
200- authenticatorSupplier .get (),
201- isInnerClient );
202- connection .whenClose (ignore -> connections .remove (serverId , connection ));
203- return connection ;
204- });
195+
196+ ServerConnection existing = connections .get (serverId );
197+ if (existing != null ) {
198+ return existing ;
199+ }
200+
201+ CompletableFuture <ServerConnection > newFuture = new CompletableFuture <>();
202+ CompletableFuture <ServerConnection > f = connectionFutures .putIfAbsent (serverId , newFuture );
203+ if (f == null ) {
204+ f = newFuture ;
205+ try {
206+ LOG .debug ("Creating connection to server {}." , node );
207+ ServerConnection conn =
208+ new ServerConnection (
209+ bootstrap ,
210+ node ,
211+ clientMetricGroup ,
212+ authenticatorSupplier .get (),
213+ isInnerClient );
214+
215+ // We must add the connection to the connections map before registering close
216+ // callback.
217+ // Otherwise, the connection may never be removed from the connections map if the
218+ // connection close immediately.
219+ connections .put (serverId , conn );
220+ conn .whenClose (ignored -> connections .remove (serverId , conn ));
221+
222+ newFuture .complete (conn );
223+ } finally {
224+ connectionFutures .remove (serverId , newFuture );
225+ }
226+ }
227+
228+ try {
229+ return f .get ();
230+ } catch (InterruptedException e ) {
231+ Thread .currentThread ().interrupt ();
232+ throw new RuntimeException (e );
233+ } catch (ExecutionException e ) {
234+ Throwable cause = e .getCause ();
235+ if (cause instanceof RuntimeException ) {
236+ throw (RuntimeException ) cause ;
237+ }
238+ throw new RuntimeException (cause );
239+ }
205240 }
206241
207242 @ VisibleForTesting
0 commit comments