77from textwrap import dedent
88from threading import Thread
99from typing import Callable , Coroutine , Iterator , Literal , Self , assert_never , cast
10- from weakref import WeakKeyDictionary , WeakValueDictionary
1110
1211from .types import Shareable
1312
@@ -32,10 +31,7 @@ class Runner:
3231 def __init__ (self , * , workers : int ) -> None :
3332 self ._tasks = create_queue ()
3433 self ._results = create_queue ()
35- self ._aio_tasks : WeakValueDictionary [int , asyncio .Future ] = WeakValueDictionary ()
36- self ._loops : WeakKeyDictionary [asyncio .Future , asyncio .AbstractEventLoop ] = (
37- WeakKeyDictionary ()
38- )
34+ self ._futures : dict [int , tuple [asyncio .Future , asyncio .AbstractEventLoop ]] = {}
3935 self ._code = dedent ("""
4036 import importlib
4137 import importlib.util
@@ -110,13 +106,13 @@ def _coordinator(self) -> None:
110106 # Interpreter closed
111107 workers -= 1
112108 case int (i ), False , str (reason ):
113- future = self ._aio_tasks [i ]
114- self . _loops [ future ] .call_soon_threadsafe (
109+ future , loop = self ._futures [i ]
110+ loop .call_soon_threadsafe (
115111 future .set_exception , InterpreterError (reason )
116112 )
117113 case int (i ), True , result :
118- future = self ._aio_tasks [i ]
119- self . _loops [ future ]. call_soon_threadsafe (self . _aio_tasks [ i ] .set_result , result )
114+ future , loop = self ._futures [i ]
115+ loop . call_soon_threadsafe (future .set_result , result )
120116 case other :
121117 raise InterpreterError ("Unexpected queue value: " , other )
122118
@@ -164,8 +160,7 @@ async def run_module_function(
164160 assert not self .stopped , "Runner must be started"
165161 future = asyncio .Future ()
166162 id_ = id (future )
167- self ._aio_tasks [id_ ] = future
168- self ._loops [future ] = asyncio .get_running_loop ()
163+ self ._futures [id_ ] = future , asyncio .get_running_loop ()
169164 self ._tasks .put ((id_ , * module_info , args , tuple (kwargs .items ())))
170165 return await future
171166
0 commit comments