Skip to content

Commit 4cbff53

Browse files
committed
fix test and lint
Signed-off-by: Tim Li <ltim@uber.com>
1 parent ab65670 commit 4cbff53

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

cadence/_internal/activity/_context.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
from concurrent.futures.thread import ThreadPoolExecutor
3-
from logging import getLogger
43
from typing import Any
54

65
from cadence import Client
@@ -9,8 +8,6 @@
98
from cadence.activity import ActivityInfo, ActivityContext
109
from cadence.api.v1.common_pb2 import Payload
1110

12-
_logger = getLogger(__name__)
13-
1411

1512
class _Context(ActivityContext):
1613
def __init__(
@@ -42,10 +39,7 @@ def info(self) -> ActivityInfo:
4239
return self._info
4340

4441
def heartbeat(self, *details: Any) -> None:
45-
task = asyncio.ensure_future(
46-
self._heartbeat_sender.send_heartbeat(*details)
47-
)
48-
task.add_done_callback(_on_heartbeat_done)
42+
asyncio.ensure_future(self._heartbeat_sender.send_heartbeat(*details))
4943

5044

5145
class _SyncContext(_Context):
@@ -73,13 +67,6 @@ def client(self) -> Client:
7367
raise RuntimeError("client is only supported in async activities")
7468

7569
def heartbeat(self, *details: Any) -> None:
76-
future = asyncio.run_coroutine_threadsafe(
70+
asyncio.run_coroutine_threadsafe(
7771
self._heartbeat_sender.send_heartbeat(*details), self._loop
7872
)
79-
future.add_done_callback(_on_heartbeat_done)
80-
81-
82-
def _on_heartbeat_done(task: asyncio.Task | asyncio.Future) -> None:
83-
exc = task.exception()
84-
if exc is not None:
85-
_logger.warning("Heartbeat failed: %s", exc)

cadence/_internal/activity/_heartbeat.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from logging import getLogger
12
from typing import Any
23

34
from cadence.api.v1.service_worker_pb2 import RecordActivityTaskHeartbeatRequest
45
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
56
from cadence.data_converter import DataConverter
67

8+
_logger = getLogger(__name__)
9+
710

811
class _HeartbeatSender:
912
def __init__(
@@ -19,11 +22,14 @@ def __init__(
1922
self._identity = identity
2023

2124
async def send_heartbeat(self, *details: Any) -> None:
22-
payload = self._data_converter.to_data(list(details))
23-
await self._worker_stub.RecordActivityTaskHeartbeat(
24-
RecordActivityTaskHeartbeatRequest(
25-
task_token=self._task_token,
26-
details=payload,
27-
identity=self._identity,
25+
try:
26+
payload = self._data_converter.to_data(list(details))
27+
await self._worker_stub.RecordActivityTaskHeartbeat(
28+
RecordActivityTaskHeartbeatRequest(
29+
task_token=self._task_token,
30+
details=payload,
31+
identity=self._identity,
32+
)
2833
)
29-
)
34+
except Exception:
35+
_logger.warning("Heartbeat failed", exc_info=True)

tests/integration_tests/workflow/test_heartbeat.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,14 @@ async def test_activity_without_heartbeat_times_out(helper: CadenceHelper):
8484
execution_start_to_close_timeout=timedelta(seconds=30),
8585
)
8686

87-
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
88-
GetWorkflowExecutionHistoryRequest(
89-
domain=DOMAIN_NAME,
90-
workflow_execution=execution,
91-
wait_for_new_event=True,
92-
skip_archival=True,
87+
response: GetWorkflowExecutionHistoryResponse = (
88+
await worker.client.workflow_stub.GetWorkflowExecutionHistory(
89+
GetWorkflowExecutionHistoryRequest(
90+
domain=DOMAIN_NAME,
91+
workflow_execution=execution,
92+
wait_for_new_event=True,
93+
skip_archival=True,
94+
)
9395
)
9496
)
9597

0 commit comments

Comments
 (0)