Skip to content

Commit baba59d

Browse files
committed
add cancellation of heartbeat when activity completes
Signed-off-by: Tim Li <ltim@uber.com>
1 parent d8ffba1 commit baba59d

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

cadence/_internal/activity/_activity_executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from concurrent.futures import ThreadPoolExecutor
22
from logging import getLogger
33
from traceback import format_exception
4-
from typing import Any, Callable, cast
4+
from typing import Any, Callable, Union, cast
55
from google.protobuf.duration import to_timedelta
66
from google.protobuf.timestamp import to_datetime
77

@@ -47,7 +47,9 @@ async def execute(self, task: PollForActivityTaskResponse):
4747
_logger.exception("Activity failed")
4848
await self._report_failure(task, e)
4949

50-
def _create_context(self, task: PollForActivityTaskResponse) -> _Context:
50+
def _create_context(
51+
self, task: PollForActivityTaskResponse
52+
) -> Union[_Context, _SyncContext]:
5153
activity_type = task.activity_type.name
5254
try:
5355
activity_def = cast(BaseDefinition, self._registry(activity_type))

cadence/_internal/activity/_context.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,19 @@ def __init__(
2525

2626
async def execute(self, payload: Payload) -> Any:
2727
params = self._to_params(payload)
28-
with self._activate():
29-
return await self._activity_def.impl_fn(*params)
28+
try:
29+
with self._activate():
30+
return await self._activity_def.impl_fn(*params)
31+
finally:
32+
await self._cancel_pending_heartbeats()
33+
34+
async def _cancel_pending_heartbeats(self) -> None:
35+
if not self._heartbeat_tasks:
36+
return
37+
tasks = list(self._heartbeat_tasks)
38+
for task in tasks:
39+
task.cancel()
40+
await asyncio.gather(*tasks, return_exceptions=True)
3041

3142
def _to_params(self, payload: Payload) -> list[Any]:
3243
return self._activity_def.signature.params_from_payload(
@@ -40,9 +51,11 @@ def info(self) -> ActivityInfo:
4051
return self._info
4152

4253
def heartbeat(self, *details: Any) -> None:
43-
task = asyncio.create_task(self._heartbeat_sender.send_heartbeat(*details))
44-
self._heartbeat_tasks.add(task)
45-
task.add_done_callback(self._heartbeat_tasks.discard)
54+
heartbeat_task = asyncio.create_task(
55+
self._heartbeat_sender.send_heartbeat(*details)
56+
)
57+
self._heartbeat_tasks.add(heartbeat_task)
58+
heartbeat_task.add_done_callback(self._heartbeat_tasks.discard)
4659

4760

4861
class _SyncContext(_Context):

tests/cadence/_internal/activity/test_activity_executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,9 @@ async def activity_fn():
326326
executor = ActivityExecutor(client, "task_list", "identity", 1, reg.get_activity)
327327

328328
await executor.execute(fake_task("activity_type", ""))
329-
# Allow the event loop to process the fire-and-forget heartbeat task
330-
await asyncio.sleep(0)
329+
# ensure_future schedules the heartbeat as a fire-and-forget task;
330+
# it needs multiple event-loop iterations to complete.
331+
await asyncio.sleep(0.1)
331332

332333
worker_stub.RespondActivityTaskCompleted.assert_called_once()
333334
worker_stub.RecordActivityTaskHeartbeat.assert_called_once_with(

0 commit comments

Comments
 (0)