Skip to content

Commit 20a333d

Browse files
timl3136claude
andcommitted
fix: await pending heartbeats instead of cancelling them
- Rename _cancel_pending_heartbeats to _wait_pending_heartbeats and remove task.cancel() so heartbeat RPCs complete before activity exits - Change _heartbeat_tasks type to set[Future] to accommodate both asyncio.Task (async path) and wrapped concurrent.futures.Future (sync) - Track heartbeat futures in _SyncContext via asyncio.wrap_future() and add try/finally to _SyncContext.execute() to await them - Remove asyncio.sleep(0.1) workarounds in tests since heartbeats are now guaranteed to complete before execute() returns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent baba59d commit 20a333d

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

cadence/_internal/activity/_context.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,20 @@ def __init__(
2121
self._info = info
2222
self._activity_def = activity_def
2323
self._heartbeat_sender = heartbeat_sender
24-
self._heartbeat_tasks: set[asyncio.Task[None]] = set()
24+
self._heartbeat_tasks: set[asyncio.Future[None]] = set()
2525

2626
async def execute(self, payload: Payload) -> Any:
2727
params = self._to_params(payload)
2828
try:
2929
with self._activate():
3030
return await self._activity_def.impl_fn(*params)
3131
finally:
32-
await self._cancel_pending_heartbeats()
32+
await self._wait_pending_heartbeats()
3333

34-
async def _cancel_pending_heartbeats(self) -> None:
34+
async def _wait_pending_heartbeats(self) -> None:
3535
if not self._heartbeat_tasks:
3636
return
3737
tasks = list(self._heartbeat_tasks)
38-
for task in tasks:
39-
task.cancel()
4038
await asyncio.gather(*tasks, return_exceptions=True)
4139

4240
def _to_params(self, payload: Payload) -> list[Any]:
@@ -73,7 +71,10 @@ def __init__(
7371
async def execute(self, payload: Payload) -> Any:
7472
params = self._to_params(payload)
7573
self._loop = asyncio.get_running_loop()
76-
return await self._loop.run_in_executor(self._executor, self._run, params)
74+
try:
75+
return await self._loop.run_in_executor(self._executor, self._run, params)
76+
finally:
77+
await self._wait_pending_heartbeats()
7778

7879
def _run(self, args: list[Any]) -> Any:
7980
with self._activate():
@@ -83,6 +84,9 @@ def client(self) -> Client:
8384
raise RuntimeError("client is only supported in async activities")
8485

8586
def heartbeat(self, *details: Any) -> None:
86-
asyncio.run_coroutine_threadsafe(
87+
future = asyncio.run_coroutine_threadsafe(
8788
self._heartbeat_sender.send_heartbeat(*details), self._loop
8889
)
90+
wrapped = asyncio.wrap_future(future, loop=self._loop)
91+
self._heartbeat_tasks.add(wrapped)
92+
wrapped.add_done_callback(self._heartbeat_tasks.discard)

tests/cadence/_internal/activity/test_activity_executor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,6 @@ async def activity_fn():
326326
executor = ActivityExecutor(client, "task_list", "identity", 1, reg.get_activity)
327327

328328
await executor.execute(fake_task("activity_type", ""))
329-
# ensure_future schedules the heartbeat as a fire-and-forget task;
330-
# it needs multiple event-loop iterations to complete.
331-
await asyncio.sleep(0.1)
332329

333330
worker_stub.RespondActivityTaskCompleted.assert_called_once()
334331
worker_stub.RecordActivityTaskHeartbeat.assert_called_once_with(
@@ -359,9 +356,6 @@ def activity_fn():
359356
executor = ActivityExecutor(client, "task_list", "identity", 1, reg.get_activity)
360357

361358
await executor.execute(fake_task("activity_type", ""))
362-
# run_coroutine_threadsafe schedules the heartbeat from a thread;
363-
# the coroutine needs multiple event-loop iterations to complete.
364-
await asyncio.sleep(0.1)
365359

366360
worker_stub.RespondActivityTaskCompleted.assert_called_once()
367361
worker_stub.RecordActivityTaskHeartbeat.assert_called_once_with(

0 commit comments

Comments
 (0)