Skip to content

Commit f253b48

Browse files
committed
temp draft for signal handling
Signed-off-by: Tim Li <ltim@uber.com>
1 parent 09fd459 commit f253b48

File tree

3 files changed

+557
-2
lines changed

3 files changed

+557
-2
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,11 @@ def _process_decision_events(
184184
"replay_mode": decision_events.replay,
185185
},
186186
)
187+
event_attr_name = event.WhichOneof("attributes")
188+
187189
# start workflow on workflow started event
188190
if (
189-
event.WhichOneof("attributes")
191+
event_attr_name
190192
== "workflow_execution_started_event_attributes"
191193
):
192194
started_attrs: WorkflowExecutionStartedEventAttributes = (
@@ -195,6 +197,20 @@ def _process_decision_events(
195197
if started_attrs and hasattr(started_attrs, "input"):
196198
self._workflow_instance.start(started_attrs.input)
197199

200+
# invoke signal handler on signal event
201+
elif (
202+
event_attr_name
203+
== "workflow_execution_signaled_event_attributes"
204+
):
205+
signaled_attrs = (
206+
event.workflow_execution_signaled_event_attributes
207+
)
208+
self._workflow_instance.handle_signal(
209+
signaled_attrs.signal_name,
210+
signaled_attrs.input,
211+
event.event_id,
212+
)
213+
198214
# Process through state machines (DecisionsHelper now delegates to DecisionManager)
199215
self._decision_manager.handle_history_event(event)
200216

cadence/_internal/workflow/workflow_intance.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
import logging
12
from asyncio import CancelledError, InvalidStateError, Task
2-
from typing import Optional
3+
from typing import Optional, Type
4+
35
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
46
from cadence.api.v1.common_pb2 import Payload
57
from cadence.data_converter import DataConverter
68
from cadence.error import WorkflowFailure
79
from cadence.workflow import WorkflowDefinition
810

11+
logger = logging.getLogger(__name__)
12+
913

1014
class WorkflowInstance:
1115
def __init__(
@@ -29,6 +33,72 @@ def start(self, payload: Payload):
2933
)
3034
self._task = self._loop.create_task(run_method(*workflow_input))
3135

36+
def handle_signal(
37+
self, signal_name: str, payload: Payload, event_id: int
38+
) -> None:
39+
"""Handle an incoming signal by invoking the registered signal handler.
40+
41+
Looks up the signal definition by name, decodes the payload using the
42+
data converter and parameter type hints, and invokes the handler on the
43+
workflow instance. Async handlers are scheduled as tasks on the
44+
deterministic event loop so they execute during run_once().
45+
46+
Args:
47+
signal_name: The name of the signal to handle.
48+
payload: The encoded signal input payload.
49+
event_id: The history event ID (used for logging context).
50+
"""
51+
# Guard: reject signals after workflow completion (matches Java client)
52+
if self.is_done():
53+
logger.warning(
54+
"Signal received after workflow is completed, ignoring",
55+
extra={"signal_name": signal_name, "event_id": event_id},
56+
)
57+
return
58+
59+
signal_def = self._definition.signals.get(signal_name)
60+
if signal_def is None:
61+
logger.warning(
62+
"Received signal with no registered handler, ignoring",
63+
extra={"signal_name": signal_name, "event_id": event_id},
64+
)
65+
return
66+
67+
# Decode payload using parameter type hints from the signal definition.
68+
# Deserialization errors are caught and logged rather than crashing the
69+
# decision (matches Java client DataConverterException handling).
70+
type_hints: list[Type | None] = [p.type_hint for p in signal_def.params]
71+
try:
72+
if type_hints:
73+
decoded_args = self._data_converter.from_data(payload, type_hints)
74+
else:
75+
decoded_args = []
76+
except Exception:
77+
logger.error(
78+
"Failed to deserialize signal payload, dropping signal",
79+
extra={"signal_name": signal_name, "event_id": event_id},
80+
exc_info=True,
81+
)
82+
return
83+
84+
# Invoke the handler on the workflow instance.
85+
# signal_def._wrapped is the unbound class method, so we pass
86+
# self._instance as the first argument.
87+
# Handler invocation errors are caught and logged rather than crashing
88+
# the decision task (matches Java client InvocationTargetException handling).
89+
try:
90+
if signal_def.is_async:
91+
coro = signal_def(self._instance, *decoded_args)
92+
self._loop.create_task(coro)
93+
else:
94+
signal_def(self._instance, *decoded_args)
95+
except Exception:
96+
logger.error(
97+
"Signal handler raised an exception",
98+
extra={"signal_name": signal_name, "event_id": event_id},
99+
exc_info=True,
100+
)
101+
32102
def run_once(self):
33103
self._loop.run_until_yield()
34104

0 commit comments

Comments
 (0)