Skip to content

Commit 419e1ae

Browse files
committed
fix(quic): close the QuicChannel when a dial is cancelled
NetworkImpl.connect() races dials across addresses and cancels the losing dial futures. QUIC dial() returns the dependent thenApply future; cancelling it does not propagate upstream to the QUIC connect, so when the handshake later completes the thenApply body is skipped. The established QuicChannel was therefore never registered, handed to the caller, nor closed, leaking it (and its underlying datagram channel) until the idle timeout. Mirror the TCP transport: register the channel as soon as the connection is established (before thenApply) so it is tracked and closeable, and close it explicitly when the returned future is cancelled, cascading to the ephemeral datagram channel via the existing close listener.
1 parent e0ed7ff commit 419e1ae

2 files changed

Lines changed: 66 additions & 2 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,18 @@ class QuicTransport(
252252
if (ex != null || quicCh == null) {
253253
udpChannel.close()
254254
} else {
255+
// Register as soon as the connection is established — before the dependent
256+
// thenApply runs — so that a dial cancelled mid-race (whose thenApply is
257+
// skipped) still has its channel tracked, closeable on transport shutdown,
258+
// and counted in activeConnections.
259+
registerChannel(quicCh)
255260
quicCh.closeFuture().addListener { udpChannel.close() }
256261
}
257262
}
258263
}
259264

260-
return quicConnFuture
265+
val connectionFuture: CompletableFuture<Connection> = quicConnFuture
261266
.thenApply {
262-
registerChannel(it)
263267
try {
264268
val connection = it.attr(CONNECTION).get() as ConnectionOverNetty
265269

@@ -306,6 +310,18 @@ class QuicTransport(
306310
throw e
307311
}
308312
}
313+
314+
// NetworkImpl.connect() cancels losing dial futures in the multi-address race. Cancelling
315+
// this dependent future does NOT propagate upstream to quicConnFuture, so if the handshake
316+
// later completes, the thenApply body above is skipped and the established QuicChannel is
317+
// never registered, handed to the caller, nor closed. Close it explicitly on cancellation
318+
// so neither it nor its underlying datagram channel is leaked (mirrors the TCP transport).
319+
connectionFuture.whenComplete { _, _ ->
320+
if (connectionFuture.isCancelled) {
321+
quicConnFuture.thenAccept { it.close() }
322+
}
323+
}
324+
return connectionFuture
309325
}
310326

311327
private fun registerChannel(ch: Channel) {

libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,4 +789,52 @@ void dialThatFailsAfterHandshakeDoesNotLeakConnection() throws Exception {
789789
clientTransport.close().get(5, TimeUnit.SECONDS);
790790
serverTransport.close().get(5, TimeUnit.SECONDS);
791791
}
792+
793+
/**
794+
* NetworkImpl.connect() races dials across multiple addresses and cancels the losing dial futures
795+
* once the first one completes. For QUIC, dial() returns the dependent {@code thenApply} future;
796+
* cancelling it does not propagate upstream to the QUIC connect. If the handshake then completes,
797+
* the {@code thenApply} body is skipped, so the established QuicChannel is never registered,
798+
* handed to the caller, nor closed — it (and its underlying datagram channel) leaks until the
799+
* idle timeout. A cancelled dial must promptly close the established QUIC connection.
800+
*/
801+
@Test
802+
void cancelledDialClosesEstablishedQuicConnection() throws Exception {
803+
String serverListenAddress = "/ip4/127.0.0.1/udp/" + getPort() + "/quic-v1";
804+
805+
Pair<PrivKey, PubKey> serverKeyPair = KeyKt.generateKeyPair(KeyType.ED25519);
806+
Pair<PrivKey, PubKey> clientKeyPair = KeyKt.generateKeyPair(KeyType.ED25519);
807+
List<io.libp2p.core.multistream.ProtocolBinding<?>> emptyProtocols = new ArrayList<>();
808+
809+
QuicTransport serverTransport = QuicTransport.ECDSA(serverKeyPair.component1(), emptyProtocols);
810+
QuicTransport clientTransport = QuicTransport.ECDSA(clientKeyPair.component1(), emptyProtocols);
811+
serverTransport.initialize();
812+
clientTransport.initialize();
813+
814+
serverTransport
815+
.listen(new Multiaddr(serverListenAddress), conn -> {}, null)
816+
.get(5, TimeUnit.SECONDS);
817+
818+
CompletableFuture<Connection> dial =
819+
clientTransport.dial(new Multiaddr(serverListenAddress), conn -> {}, null);
820+
// Cancel synchronously, before the async handshake completes — the same situation a losing dial
821+
// in NetworkImpl.connect() hits.
822+
dial.cancel(true);
823+
824+
// The handshake still completes at the network level, so the QuicChannel is established and
825+
// tracked. A leaked channel stays in activeConnections until the 30s idle timeout; a properly
826+
// handled cancellation closes it, dropping activeConnections back to zero well inside this
827+
// window.
828+
long deadline = System.currentTimeMillis() + 15_000;
829+
while (clientTransport.getActiveConnections() > 0 && System.currentTimeMillis() < deadline) {
830+
Thread.sleep(50);
831+
}
832+
Assertions.assertEquals(
833+
0,
834+
clientTransport.getActiveConnections(),
835+
"a cancelled dial must close (not leak) the established QUIC connection");
836+
837+
clientTransport.close().get(5, TimeUnit.SECONDS);
838+
serverTransport.close().get(5, TimeUnit.SECONDS);
839+
}
792840
}

0 commit comments

Comments
 (0)