Skip to content

Commit 6cff798

Browse files
authored
feat: Add NonDeterminism Tracking (cadence-workflow#65)
<!-- Describe what has changed in this PR --> **What changed?** - Remove DecisionsHelper. The mechanism it used for generating IDs is inconsistent with the other clients, where they all share an incrementing integer. The other functionality it provided was unused. - Add NonDeterminism Tracking to provide specific errors when nondeterminism occurs. - We look at the outcome decisions for the batch and create a set of expectations that must be met so that we can immediately flag non-determinism, rather than waiting until after the user code finishing execution. - We add markers any time there's immediate cancellation. - Add a state machine for Workflow completion. We need to route the Decision attributes through the DecisionManager so that it can check for nondeterminism. - Restructure workflow exception handling a bit to handle ExceptionGroup. This still needs more work. <!-- Tell your future self why have you made these changes --> **Why?** - Feature parity with other clients - Trying out new non-determinism detection algorithm to see if it would work for other clients. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** - Unit and integration tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** - This is a breaking change <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 09fd459 commit 6cff798

21 files changed

+1972
-504
lines changed

cadence/_internal/workflow/context.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from typing import Iterator, Optional, Any, Unpack, Type, cast
55

66
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
7-
from cadence._internal.workflow.decisions_helper import DecisionsHelper
87
from cadence.api.v1.common_pb2 import ActivityType
98
from cadence.api.v1.decision_pb2 import ScheduleActivityTaskDecisionAttributes
109
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
@@ -21,7 +20,6 @@ def __init__(
2120
self._info = info
2221
self._replay_mode = True
2322
self._replay_current_time_milliseconds: Optional[int] = None
24-
self._decision_helper = DecisionsHelper()
2523
self._decision_manager = decision_manager
2624

2725
def info(self) -> WorkflowInfo:
@@ -70,9 +68,7 @@ async def execute_activity(
7068
)
7169

7270
activity_input = self.data_converter().to_data(list(args))
73-
activity_id = self._decision_helper.generate_activity_id(activity)
7471
schedule_attributes = ScheduleActivityTaskDecisionAttributes(
75-
activity_id=activity_id,
7672
activity_type=ActivityType(name=activity),
7773
domain=self.info().workflow_domain,
7874
task_list=TaskList(kind=TaskListKind.TASK_LIST_KIND_NORMAL, name=task_list),

cadence/_internal/workflow/decisions_helper.py

Lines changed: 0 additions & 312 deletions
This file was deleted.

cadence/_internal/workflow/deterministic_event_loop.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import traceback
12
from asyncio import AbstractEventLoop, Handle, TimerHandle, futures, tasks, Future, Task
23
from contextvars import Context
34
import logging
@@ -9,6 +10,12 @@
910

1011
logger = logging.getLogger(__name__)
1112

13+
14+
class FatalDecisionError(Exception):
15+
def __init__(self, *args) -> None:
16+
super().__init__(*args)
17+
18+
1219
_Ts = TypeVarTuple("_Ts")
1320
_T = TypeVar("_T")
1421

@@ -455,9 +462,36 @@ def default_exception_handler(self, context: dict[str, Any]) -> None:
455462
)
456463

457464
def call_exception_handler(self, context: dict[str, Any]) -> None:
458-
raise NotImplementedError(
459-
"Custom exception handlers not supported in deterministic event loop"
460-
)
465+
# This is called if a task has an unhandled exception. Short term, it's helpful to log these for debugging.
466+
# Long term, we need some combination of failing decision tasks or workflows based on these errors.
467+
message = context.get("message")
468+
if not message:
469+
message = "Unhandled exception in event loop"
470+
471+
exception = context.get("exception")
472+
if isinstance(exception, BaseException):
473+
exc_info = exception
474+
else:
475+
exc_info = None
476+
477+
log_lines = [message]
478+
for key in sorted(context):
479+
if key in {"message", "exception"}:
480+
continue
481+
value = context[key]
482+
if key == "source_traceback":
483+
tb = "".join(traceback.format_list(value))
484+
value = "Object created at (most recent call last):\n"
485+
value += tb.rstrip()
486+
elif key == "handle_traceback":
487+
tb = "".join(traceback.format_list(value))
488+
value = "Handle created at (most recent call last):\n"
489+
value += tb.rstrip()
490+
else:
491+
value = repr(value)
492+
log_lines.append(f"{key}: {value}")
493+
494+
logger.error("\n".join(log_lines), exc_info=exc_info)
461495

462496
# Task factory
463497
def set_task_factory( # type: ignore[override]

0 commit comments

Comments
 (0)