Skip to content

Commit dc680eb

Browse files
junjzhangfantixclaude
authored
Detach socket on create_connection cancellation to prevent fd double-close (#740)
Fixes #645, #738 Closes #646 --------- Co-authored-by: Fantix King <fantix.king@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6cd24cb commit dc680eb

3 files changed

Lines changed: 249 additions & 0 deletions

File tree

tests/test_tcp.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,130 @@ async def test():
737737
with s1, s2:
738738
loop.run_until_complete(test())
739739

740+
def test_create_connection_sock_cancel_detaches(self):
741+
async def client(addr):
742+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
743+
sock.setblocking(False)
744+
try:
745+
sock.connect(addr)
746+
except BlockingIOError:
747+
pass
748+
await asyncio.sleep(0.01)
749+
750+
task = asyncio.ensure_future(
751+
self.loop.create_connection(asyncio.Protocol, sock=sock))
752+
await asyncio.sleep(0)
753+
task.cancel()
754+
with self.assertRaises(asyncio.CancelledError):
755+
await task
756+
757+
# After cancellation the socket must be detached (fd == -1)
758+
# so that its __del__ won't close a recycled fd.
759+
self.assertEqual(sock.fileno(), -1)
760+
761+
def _recv_or_abort(sock):
762+
try:
763+
sock.recv_all(1)
764+
except ConnectionAbortedError:
765+
pass
766+
767+
with self.tcp_server(_recv_or_abort,
768+
max_clients=1,
769+
backlog=1) as srv:
770+
self.loop.run_until_complete(client(srv.addr))
771+
772+
def test_create_connection_sock_cancel_fd_leak(self):
773+
# Regression test for https://github.com/MagicStack/uvloop/issues/645
774+
# and https://github.com/aio-libs/aiohttp/issues/10506
775+
#
776+
# When create_connection(sock=sock) is cancelled, the socket must
777+
# be detached so its close()/`__del__` won't double-close the fd.
778+
# Without the fix, libuv closes the fd but the socket object still
779+
# references it, enabling a chain of fd corruption and data leak:
780+
#
781+
# 1. cancel → libuv closes fd N
782+
# 2. New connection (victim) reuses fd N
783+
# 3. Stale sock.close() closes fd N → breaks the victim
784+
# 4. Another fd N is opened (new connection)
785+
# 5. Victim writev(N) → data goes to the wrong connection
786+
787+
async def test():
788+
srv = await asyncio.start_server(
789+
lambda r, w: w.close(),
790+
'127.0.0.1', 0,
791+
family=socket.AF_INET)
792+
addr = srv.sockets[0].getsockname()
793+
794+
# --- Step 1: create_connection with sock= and cancel it ---
795+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
796+
sock.setblocking(False)
797+
await self.loop.sock_connect(sock, addr)
798+
stale_fd = sock.fileno()
799+
800+
task = self.loop.create_task(
801+
self.loop.create_connection(asyncio.Protocol, sock=sock)
802+
)
803+
await asyncio.sleep(0)
804+
task.cancel()
805+
with self.assertRaises(asyncio.CancelledError):
806+
await task
807+
808+
# --- Step 2: a victim connection reuses the fd ---
809+
victim_tr, _ = await self.loop.create_connection(
810+
asyncio.Protocol, *addr)
811+
victim_fd = victim_tr.get_extra_info('socket').fileno()
812+
if victim_fd != stale_fd:
813+
victim_tr.close()
814+
sock.close()
815+
srv.close()
816+
await srv.wait_closed()
817+
raise unittest.SkipTest(
818+
f'fd not reused (got {victim_fd}, need {stale_fd})')
819+
820+
# --- Step 3: stale sock.close() must NOT kill the victim ---
821+
# Allocate the socketpair BEFORE sock.close() so the pair
822+
# fds don't collide with stale_fd.
823+
spy_a, spy_b = socket.socketpair()
824+
spy_b.setblocking(False)
825+
826+
sock.close()
827+
828+
# Check whether sock.close() broke the victim's fd.
829+
victim_broken = False
830+
try:
831+
os.fstat(victim_fd)
832+
except OSError:
833+
victim_broken = True
834+
835+
if victim_broken:
836+
# The victim's fd was killed — place a spy socket on
837+
# the freed fd (in production this would be a new
838+
# incoming connection).
839+
os.dup2(spy_a.fileno(), stale_fd)
840+
spy_a.close()
841+
842+
# Victim writes. If victim_broken, writev(stale_fd) goes
843+
# to the spy; otherwise it goes to the real connection.
844+
victim_tr.write(b'LEAKED')
845+
846+
try:
847+
leaked = spy_b.recv(4096)
848+
except BlockingIOError:
849+
leaked = b''
850+
851+
if victim_broken:
852+
os.close(stale_fd)
853+
spy_b.close()
854+
victim_tr.close()
855+
srv.close()
856+
await srv.wait_closed()
857+
858+
self.assertEqual(leaked, b'',
859+
f"Data leaked to an unrelated socket: "
860+
f"got {leaked!r}")
861+
862+
self.loop.run_until_complete(test())
863+
740864

741865
class Test_UV_TCP(_TestTCP, tb.UVTestCase):
742866

tests/test_unix.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,117 @@ def test_create_unix_connection_6(self):
404404
lambda: None, path='/tmp/a',
405405
ssl_handshake_timeout=SSL_HANDSHAKE_TIMEOUT))
406406

407+
def test_create_unix_connection_sock_cancel_detaches(self):
408+
async def test():
409+
srv_path = os.path.join(tempfile.mkdtemp(), 'test.sock')
410+
srv = await asyncio.start_unix_server(
411+
lambda r, w: w.close(), path=srv_path)
412+
413+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414+
sock.setblocking(False)
415+
try:
416+
sock.connect(srv_path)
417+
except BlockingIOError:
418+
pass
419+
await asyncio.sleep(0.01)
420+
421+
task = asyncio.ensure_future(
422+
self.loop.create_unix_connection(
423+
asyncio.Protocol, sock=sock))
424+
await asyncio.sleep(0)
425+
task.cancel()
426+
with self.assertRaises(asyncio.CancelledError):
427+
await task
428+
429+
self.assertEqual(sock.fileno(), -1)
430+
431+
srv.close()
432+
await srv.wait_closed()
433+
if os.path.exists(srv_path):
434+
os.unlink(srv_path)
435+
436+
self.loop.run_until_complete(test())
437+
438+
def test_create_unix_connection_sock_cancel_fd_leak(self):
439+
# Same as test_create_connection_sock_cancel_fd_leak but for
440+
# the create_unix_connection(sock=) path.
441+
442+
async def test():
443+
srv_path = os.path.join(tempfile.mkdtemp(), 'test.sock')
444+
srv = await asyncio.start_unix_server(
445+
lambda r, w: w.close(), path=srv_path)
446+
447+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
448+
sock.setblocking(False)
449+
await self.loop.sock_connect(sock, srv_path)
450+
stale_fd = sock.fileno()
451+
452+
task = self.loop.create_task(
453+
self.loop.create_unix_connection(
454+
asyncio.Protocol, sock=sock))
455+
await asyncio.sleep(0)
456+
task.cancel()
457+
with self.assertRaises(asyncio.CancelledError):
458+
await task
459+
460+
# Create victim that reuses the fd.
461+
victim_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
462+
victim_sock.setblocking(False)
463+
await self.loop.sock_connect(victim_sock, srv_path)
464+
victim_tr, _ = await self.loop.create_unix_connection(
465+
asyncio.Protocol, sock=victim_sock)
466+
victim_fd = victim_tr.get_extra_info('socket').fileno()
467+
if victim_fd != stale_fd:
468+
victim_tr.close()
469+
sock.close()
470+
srv.close()
471+
await srv.wait_closed()
472+
if os.path.exists(srv_path):
473+
os.unlink(srv_path)
474+
raise unittest.SkipTest(
475+
f'fd not reused (got {victim_fd}, need {stale_fd})')
476+
477+
spy_a, spy_b = socket.socketpair()
478+
spy_b.setblocking(False)
479+
480+
sock.close()
481+
482+
victim_broken = False
483+
try:
484+
os.fstat(victim_fd)
485+
except OSError:
486+
victim_broken = True
487+
488+
if victim_broken:
489+
os.dup2(spy_a.fileno(), stale_fd)
490+
spy_a.close()
491+
492+
victim_tr.write(b'LEAKED')
493+
494+
try:
495+
leaked = spy_b.recv(4096)
496+
except BlockingIOError:
497+
leaked = b''
498+
499+
if victim_broken:
500+
os.close(stale_fd)
501+
spy_b.close()
502+
victim_tr.close()
503+
# Let pending callbacks (e.g. server-side connection_lost
504+
# from the cancelled connection) run before closing the
505+
# server, to avoid triggering call_exception_handler().
506+
await asyncio.sleep(0)
507+
srv.close()
508+
await srv.wait_closed()
509+
if os.path.exists(srv_path):
510+
os.unlink(srv_path)
511+
512+
self.assertEqual(leaked, b'',
513+
f"Data leaked to an unrelated socket: "
514+
f"got {leaked!r}")
515+
516+
self.loop.run_until_complete(test())
517+
407518

408519
class Test_UV_Unix(_TestUnix, tb.UVTestCase):
409520

uvloop/loop.pyx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,9 @@ cdef class Loop:
20532053
tr = TCPTransport.new(self, protocol, None, waiter, context)
20542054
try:
20552055
# libuv will make socket non-blocking
2056+
# We are not detaching the PSO from the now-libuv-managed
2057+
# FD here because of:
2058+
# https://github.com/python/asyncio/pull/449
20562059
tr._open(sock.fileno())
20572060
tr._init_protocol()
20582061
await waiter
@@ -2065,6 +2068,15 @@ cdef class Loop:
20652068
# up in `Transport._call_connection_made()`, and calling
20662069
# `_close()` before it is fine.
20672070
tr._close()
2071+
# Fix for:
2072+
# * https://github.com/MagicStack/uvloop/issues/645
2073+
# * https://github.com/MagicStack/uvloop/issues/738
2074+
# The underlying FD is closed in tr._close(), the owner of
2075+
# `sock` must not get a chance to double-close the same FD
2076+
# sometime later, because that FD may be reused by a new
2077+
# connection under load. So we detach the PSO from the
2078+
# already-closed FD here.
2079+
sock.detach()
20682080
raise
20692081

20702082
tr._attach_fileobj(sock)
@@ -2306,7 +2318,9 @@ cdef class Loop:
23062318
except (KeyboardInterrupt, SystemExit):
23072319
raise
23082320
except BaseException:
2321+
# See comments in create_connection() for more information
23092322
tr._close()
2323+
sock.detach()
23102324
raise
23112325

23122326
tr._attach_fileobj(sock)

0 commit comments

Comments
 (0)