4545import java .util .List ;
4646import java .util .Map ;
4747import java .util .concurrent .CompletableFuture ;
48+ import java .util .concurrent .ExecutionException ;
4849import java .util .concurrent .TimeUnit ;
4950import java .util .function .Supplier ;
5051
@@ -71,6 +72,8 @@ public final class NettyClient implements RpcClient {
7172 */
7273 private final Map <String , ServerConnection > connections ;
7374
75+ private final Map <String , CompletableFuture <ServerConnection >> connectionFutures ;
76+
7477 /** Metric groups for client. */
7578 private final ClientMetricGroup clientMetricGroup ;
7679
@@ -87,6 +90,7 @@ public final class NettyClient implements RpcClient {
8790 public NettyClient (
8891 Configuration conf , ClientMetricGroup clientMetricGroup , boolean isInnerClient ) {
8992 this .connections = MapUtils .newConcurrentHashMap ();
93+ this .connectionFutures = MapUtils .newConcurrentHashMap ();
9094
9195 // build bootstrap
9296 this .eventGroup =
@@ -188,20 +192,46 @@ public void close() throws Exception {
188192
189193 private ServerConnection getOrCreateConnection (ServerNode node ) {
190194 String serverId = node .uid ();
191- return connections .computeIfAbsent (
192- serverId ,
193- ignored -> {
194- LOG .debug ("Creating connection to server {}." , node );
195- ServerConnection connection =
196- new ServerConnection (
197- bootstrap ,
198- node ,
199- clientMetricGroup ,
200- authenticatorSupplier .get (),
201- isInnerClient );
202- connection .whenClose (ignore -> connections .remove (serverId , connection ));
203- return connection ;
204- });
195+
196+ ServerConnection existing = connections .get (serverId );
197+ if (existing != null ) {
198+ return existing ;
199+ }
200+
201+ CompletableFuture <ServerConnection > newFuture = new CompletableFuture <>();
202+ CompletableFuture <ServerConnection > f = connectionFutures .putIfAbsent (serverId , newFuture );
203+ if (f == null ) {
204+ f = newFuture ;
205+ try {
206+ LOG .debug ("Creating connection to server {}." , node );
207+ ServerConnection conn =
208+ new ServerConnection (
209+ bootstrap ,
210+ node ,
211+ clientMetricGroup ,
212+ authenticatorSupplier .get (),
213+ isInnerClient );
214+ conn .whenClose (ignored -> connections .remove (serverId , conn ));
215+ connections .put (serverId , conn );
216+
217+ newFuture .complete (conn );
218+ } finally {
219+ connectionFutures .remove (serverId , newFuture );
220+ }
221+ }
222+
223+ try {
224+ return f .get ();
225+ } catch (InterruptedException e ) {
226+ Thread .currentThread ().interrupt ();
227+ throw new RuntimeException (e );
228+ } catch (ExecutionException e ) {
229+ Throwable cause = e .getCause ();
230+ if (cause instanceof RuntimeException ) {
231+ throw (RuntimeException ) cause ;
232+ }
233+ throw new RuntimeException (cause );
234+ }
205235 }
206236
207237 @ VisibleForTesting
0 commit comments