File tree Expand file tree Collapse file tree
main/kotlin/io/libp2p/etc/util/netty/mux
test/kotlin/io/libp2p/mux Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -32,13 +32,20 @@ class MuxChannel<TData>(
3232 initializer(this )
3333 }
3434
35+ @Suppress(" SwallowedException" )
3536 override fun doWrite (buf : ChannelOutboundBuffer ) {
3637 while (true ) {
3738 val msg = buf.current() ? : break
39+ if (localDisconnected) {
40+ // Must not throw from doWrite — exceptions escape uncaught to the Netty event loop.
41+ // Wrap buf.remove() defensively: in some Netty versions promise listeners triggered
42+ // by buf.remove() can propagate back through it.
43+ try {
44+ buf.remove(ConnectionClosedException (" The stream was closed for writing locally: $id " ))
45+ } catch (e: Throwable ) { }
46+ continue
47+ }
3848 try {
39- if (localDisconnected) {
40- throw ConnectionClosedException (" The stream was closed for writing locally: $id " )
41- }
4249 // the msg is released by both onChildWrite and buf.remove() so we need to retain
4350 // however it is still to be confirmed that no buf leaks happen here TODO
4451 ReferenceCountUtil .retain(msg)
Original file line number Diff line number Diff line change @@ -6,6 +6,7 @@ import io.libp2p.core.StreamHandler
66import io.libp2p.etc.types.fromHex
77import io.libp2p.etc.types.getX
88import io.libp2p.etc.types.toHex
9+ import io.libp2p.etc.util.netty.mux.MuxChannel
910import io.libp2p.etc.util.netty.mux.RemoteWriteClosed
1011import io.libp2p.etc.util.netty.nettyInitializer
1112import io.libp2p.mux.MuxHandlerAbstractTest.AbstractTestMuxFrame.Flag.*
@@ -442,6 +443,26 @@ abstract class MuxHandlerAbstractTest {
442443 }
443444 }
444445
446+ @Test
447+ fun `write with localDisconnected should fail promise without throwing from doWrite` () {
448+ val handler = openStreamLocal()
449+ readFrameOrThrow()
450+
451+ // Simulate the state between localDisconnected=true and deactivate() in doDisconnect(),
452+ // which is when a queued WriteTask can reach doWrite with localDisconnected=true while
453+ // the channel is still active (flush0 would take the "not-yet-connected" path otherwise).
454+ @Suppress(" UNCHECKED_CAST" )
455+ (handler.ctx.channel() as MuxChannel <ByteBuf >).localDisconnected = true
456+
457+ val writeFuture = handler.ctx.writeAndFlush(allocateMessage(" 42" ))
458+ ech.runPendingTasks()
459+
460+ assertTrue(writeFuture.isDone)
461+ assertThrows(ConnectionClosedException ::class .java) {
462+ writeFuture.sync()
463+ }
464+ }
465+
445466 @Test
446467 fun `should throw when writing to reset stream` () {
447468 val handler = openStreamLocal()
You can’t perform that action at this time.
0 commit comments