@@ -17,19 +17,15 @@ class ComposeGreetingInput:
1717
1818
1919class GreetingComposer :
20- def __init__ (self , client : Client ) -> None :
20+ def __init__ (self , client : Client , loop : asyncio . AbstractEventLoop ) -> None :
2121 self .client = client
22+ self .loop = loop
2223
2324 @activity .defn
2425 def compose_greeting (self , input : ComposeGreetingInput ) -> str :
25- # Schedule a task to complete this asynchronously . This could be done in
26+ # Make a thread to complete this externally . This could be done in
2627 # a completely different process or system.
2728 print ("Completing activity asynchronously" )
28- # Tasks stored by asyncio are weak references and therefore can get GC'd
29- # which can cause warnings like "Task was destroyed but it is pending!".
30- # So we store the tasks ourselves.
31- # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks,
32- # https://bugs.python.org/issue21163 and others.
3329 Thread (
3430 target = self .complete_greeting ,
3531 args = (activity .info ().task_token , input ),
@@ -47,11 +43,13 @@ def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> N
4743 handle = self .client .get_async_activity_handle (task_token = task_token )
4844 for _ in range (0 , 3 ):
4945 print ("Waiting one second..." )
50- asyncio .run (handle .heartbeat ())
46+ asyncio .run_coroutine_threadsafe (handle .heartbeat (), self . loop )
5147 time .sleep (1 )
5248
5349 # Complete using the handle
54- asyncio .run (handle .complete (f"{ input .greeting } , { input .name } !" ))
50+ asyncio .run_coroutine_threadsafe (
51+ handle .complete (f"{ input .greeting } , { input .name } !" ), self .loop
52+ )
5553
5654
5755@workflow .defn
@@ -72,8 +70,10 @@ async def main():
7270 # Start client
7371 client = await Client .connect ("localhost:7233" )
7472
73+ loop = asyncio .get_event_loop ()
74+
7575 # Run a worker for the workflow
76- composer = GreetingComposer (client )
76+ composer = GreetingComposer (client , loop )
7777 async with Worker (
7878 client ,
7979 task_queue = "hello-async-activity-completion-task-queue" ,
0 commit comments