forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_heartbeat.py
More file actions
42 lines (36 loc) · 1.42 KB
/
_heartbeat.py
File metadata and controls
42 lines (36 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from logging import getLogger
from typing import Any, Type
from cadence.api.v1.common_pb2 import Payload
from cadence.api.v1.service_worker_pb2 import RecordActivityTaskHeartbeatRequest
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
from cadence.data_converter import DataConverter
_logger = getLogger(__name__)
class _HeartbeatSender:
def __init__(
self,
worker_stub: WorkerAPIStub,
data_converter: DataConverter,
task_token: bytes,
identity: str,
previous_details: Payload,
):
self._worker_stub = worker_stub
self._data_converter = data_converter
self._task_token = task_token
self._identity = identity
self._previous_details = previous_details
def get_details(self, *types: Type) -> list[Any]:
return self._data_converter.from_data(self._previous_details, list(types))
async def send_heartbeat(self, *details: Any) -> None:
try:
payload = self._data_converter.to_data(list(details))
await self._worker_stub.RecordActivityTaskHeartbeat(
RecordActivityTaskHeartbeatRequest(
task_token=self._task_token,
details=payload,
identity=self._identity,
)
)
self._previous_details = payload
except Exception:
_logger.warning("Heartbeat failed", exc_info=True)