Skip to content

Commit 700adcd

Browse files
committed
Associated DecisionFutures with the correct event loop
Workflow state machines aren't processed while the event loop is active, so they have no mechanism to automatically associate with them. This is a correctness issue but it's unclear what the impacts are.
1 parent 187767b commit 700adcd

File tree

5 files changed

+39
-23
lines changed

5 files changed

+39
-23
lines changed

cadence/_internal/workflow/statemachine/decision_manager.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
from collections import OrderedDict
3-
from dataclasses import dataclass, field
3+
from dataclasses import dataclass
44
from typing import Dict, Type, Tuple, ClassVar, List
55

66
from cadence._internal.workflow.statemachine.activity_state_machine import (
@@ -12,6 +12,7 @@
1212
DecisionStateMachine,
1313
DecisionType,
1414
DecisionFuture,
15+
T,
1516
)
1617
from cadence._internal.workflow.statemachine.event_dispatcher import (
1718
EventDispatcher,
@@ -48,7 +49,6 @@ def _create_dispatch_map(
4849
return result
4950

5051

51-
@dataclass
5252
class DecisionManager:
5353
"""Aggregates multiple decision state machines and coordinates decisions.
5454
@@ -64,18 +64,21 @@ class DecisionManager:
6464
DecisionType.TIMER: timer_events,
6565
}
6666
)
67-
state_machines: OrderedDict[DecisionId, DecisionStateMachine] = field(
68-
default_factory=OrderedDict
69-
)
70-
aliases: Dict[DecisionAlias, DecisionStateMachine] = field(default_factory=dict)
67+
68+
def __init__(self, event_loop: asyncio.AbstractEventLoop):
69+
self._event_loop = event_loop
70+
self.state_machines: OrderedDict[DecisionId, DecisionStateMachine] = (
71+
OrderedDict()
72+
)
73+
self.aliases: Dict[DecisionAlias, DecisionStateMachine] = dict()
7174

7275
# ----- Activity API -----
7376

7477
def schedule_activity(
7578
self, attrs: decision.ScheduleActivityTaskDecisionAttributes
7679
) -> asyncio.Future[Payload]:
7780
decision_id = DecisionId(DecisionType.ACTIVITY, attrs.activity_id)
78-
future = DecisionFuture[Payload](lambda: self._request_cancel(decision_id))
81+
future: DecisionFuture[Payload] = self._create_future(decision_id)
7982
machine = ActivityStateMachine(attrs, future)
8083
self._add_state_machine(machine)
8184

@@ -87,7 +90,7 @@ def start_timer(
8790
self, attrs: decision.StartTimerDecisionAttributes
8891
) -> asyncio.Future[None]:
8992
decision_id = DecisionId(DecisionType.TIMER, attrs.timer_id)
90-
future = DecisionFuture[None](lambda: self._request_cancel(decision_id))
93+
future: DecisionFuture[None] = self._create_future(decision_id)
9194
machine = TimerStateMachine(attrs, future)
9295
self._add_state_machine(machine)
9396

@@ -149,6 +152,11 @@ def collect_pending_decisions(self) -> List[decision.Decision]:
149152

150153
return decisions
151154

155+
def _create_future(self, decision_id: DecisionId) -> DecisionFuture[T]:
156+
return DecisionFuture[T](
157+
self._event_loop, lambda: self._request_cancel(decision_id)
158+
)
159+
152160
def _request_cancel(self, decision_id: DecisionId) -> bool:
153161
machine = self._get_machine(decision_id)
154162
# Interactions with the state machines should move them to the end so that the decisions are ordered as they

cadence/_internal/workflow/statemachine/decision_state_machine.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,12 @@ def state(self) -> DecisionState:
7474

7575

7676
class DecisionFuture(asyncio.Future[T]):
77-
def __init__(self, request_cancel: CancelFn | None = None) -> None:
78-
super().__init__()
77+
def __init__(
78+
self,
79+
loop: asyncio.AbstractEventLoop | None = None,
80+
request_cancel: CancelFn | None = None,
81+
) -> None:
82+
super().__init__(loop=loop)
7983
if request_cancel is None:
8084
request_cancel = self.force_cancel
8185
self._request_cancel = request_cancel

cadence/_internal/workflow/workflow_engine.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from cadence._internal.workflow.context import Context
77
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
8+
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
89
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
910
from cadence._internal.workflow.workflow_intance import WorkflowInstance
1011
from cadence.api.v1.common_pb2 import Failure, Payload
@@ -31,12 +32,11 @@ class DecisionResult:
3132

3233
class WorkflowEngine:
3334
def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition):
35+
self._event_loop = DeterministicEventLoop()
3436
self._workflow_instance = WorkflowInstance(
35-
workflow_definition, info.data_converter
37+
self._event_loop, workflow_definition, info.data_converter
3638
)
37-
self._decision_manager = (
38-
DecisionManager()
39-
) # TODO: remove this stateful object and use the context instead
39+
self._decision_manager = DecisionManager(self._event_loop)
4040
self._context = Context(info, self._decision_manager)
4141

4242
def process_decision(

cadence/_internal/workflow/workflow_intance.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99

1010
class WorkflowInstance:
1111
def __init__(
12-
self, workflow_definition: WorkflowDefinition, data_converter: DataConverter
12+
self,
13+
loop: DeterministicEventLoop,
14+
workflow_definition: WorkflowDefinition,
15+
data_converter: DataConverter,
1316
):
17+
self._loop = loop
1418
self._definition = workflow_definition
1519
self._data_converter = data_converter
1620
self._instance = workflow_definition.cls() # construct a new workflow object
17-
self._loop = DeterministicEventLoop()
1821
self._task: Optional[Task] = None
1922

2023
def start(self, input: Payload):

tests/cadence/_internal/workflow/statemachine/test_decision_manager.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from asyncio import CancelledError
23

34
import pytest
@@ -8,7 +9,7 @@
89

910

1011
async def test_activity_dispatch():
11-
decisions = DecisionManager()
12+
decisions = DecisionManager(asyncio.get_event_loop())
1213

1314
activity_result = decisions.schedule_activity(
1415
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -22,7 +23,7 @@ async def test_activity_dispatch():
2223

2324

2425
async def test_simple_cancellation():
25-
decisions = DecisionManager()
26+
decisions = DecisionManager(asyncio.get_event_loop())
2627

2728
activity_result = decisions.schedule_activity(
2829
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -34,7 +35,7 @@ async def test_simple_cancellation():
3435

3536

3637
async def test_cancellation_not_immediate():
37-
decisions = DecisionManager()
38+
decisions = DecisionManager(asyncio.get_event_loop())
3839

3940
activity_result = decisions.schedule_activity(
4041
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -47,7 +48,7 @@ async def test_cancellation_not_immediate():
4748

4849

4950
async def test_cancellation_completed():
50-
decisions = DecisionManager()
51+
decisions = DecisionManager(asyncio.get_event_loop())
5152

5253
activity_result = decisions.schedule_activity(
5354
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -78,7 +79,7 @@ async def test_cancellation_completed():
7879

7980

8081
async def test_collect_decisions():
81-
decisions = DecisionManager()
82+
decisions = DecisionManager(asyncio.get_event_loop())
8283

8384
activity1 = decisions.schedule_activity(
8485
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -105,7 +106,7 @@ async def test_collect_decisions():
105106

106107

107108
async def test_collect_decisions_ignore_empty():
108-
decisions = DecisionManager()
109+
decisions = DecisionManager(asyncio.get_event_loop())
109110

110111
_ = decisions.schedule_activity(
111112
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
@@ -117,7 +118,7 @@ async def test_collect_decisions_ignore_empty():
117118

118119
async def test_collection_decisions_reordering():
119120
# Decisions should be emitted in the order that they happened within the workflow
120-
decisions = DecisionManager()
121+
decisions = DecisionManager(asyncio.get_event_loop())
121122

122123
activity1 = decisions.schedule_activity(
123124
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")

0 commit comments

Comments
 (0)