Skip to content

Commit 9445f0d

Browse files
Fix ConnectionClosedException propagating to Netty event loop from MuxChannel.doWrite (libp2p#455)
In Netty 4.2.x the original code's throw+catch pattern caused the exception to escape doWrite() uncaught, surfacing as spurious "PLEASE FIX OR REPORT" noise in Teku logs. The throw was caught locally, but buf.remove(cause) — called from the catch block without its own guard — can propagate exceptions back through promise-listener callbacks in certain Netty versions. Fix: move the localDisconnected check outside the try/catch and wrap the buf.remove(cause) call defensively so nothing can escape doWrite(). Pending write promises are still properly failed with ConnectionClosedException.
1 parent 77f7625 commit 9445f0d

2 files changed

Lines changed: 31 additions & 3 deletions

File tree

libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,20 @@ class MuxChannel<TData>(
3232
initializer(this)
3333
}
3434

35+
@Suppress("SwallowedException")
3536
override fun doWrite(buf: ChannelOutboundBuffer) {
3637
while (true) {
3738
val msg = buf.current() ?: break
39+
if (localDisconnected) {
40+
// Must not throw from doWrite — exceptions escape uncaught to the Netty event loop.
41+
// Wrap buf.remove() defensively: in some Netty versions promise listeners triggered
42+
// by buf.remove() can propagate back through it.
43+
try {
44+
buf.remove(ConnectionClosedException("The stream was closed for writing locally: $id"))
45+
} catch (e: Throwable) { }
46+
continue
47+
}
3848
try {
39-
if (localDisconnected) {
40-
throw ConnectionClosedException("The stream was closed for writing locally: $id")
41-
}
4249
// the msg is released by both onChildWrite and buf.remove() so we need to retain
4350
// however it is still to be confirmed that no buf leaks happen here TODO
4451
ReferenceCountUtil.retain(msg)

libp2p/src/test/kotlin/io/libp2p/mux/MuxHandlerAbstractTest.kt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.libp2p.core.StreamHandler
66
import io.libp2p.etc.types.fromHex
77
import io.libp2p.etc.types.getX
88
import io.libp2p.etc.types.toHex
9+
import io.libp2p.etc.util.netty.mux.MuxChannel
910
import io.libp2p.etc.util.netty.mux.RemoteWriteClosed
1011
import io.libp2p.etc.util.netty.nettyInitializer
1112
import io.libp2p.mux.MuxHandlerAbstractTest.AbstractTestMuxFrame.Flag.*
@@ -442,6 +443,26 @@ abstract class MuxHandlerAbstractTest {
442443
}
443444
}
444445

446+
@Test
447+
fun `write with localDisconnected should fail promise without throwing from doWrite`() {
448+
val handler = openStreamLocal()
449+
readFrameOrThrow()
450+
451+
// Simulate the state between localDisconnected=true and deactivate() in doDisconnect(),
452+
// which is when a queued WriteTask can reach doWrite with localDisconnected=true while
453+
// the channel is still active (flush0 would take the "not-yet-connected" path otherwise).
454+
@Suppress("UNCHECKED_CAST")
455+
(handler.ctx.channel() as MuxChannel<ByteBuf>).localDisconnected = true
456+
457+
val writeFuture = handler.ctx.writeAndFlush(allocateMessage("42"))
458+
ech.runPendingTasks()
459+
460+
assertTrue(writeFuture.isDone)
461+
assertThrows(ConnectionClosedException::class.java) {
462+
writeFuture.sync()
463+
}
464+
}
465+
445466
@Test
446467
fun `should throw when writing to reset stream`() {
447468
val handler = openStreamLocal()

0 commit comments

Comments
 (0)