Conversation
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
Address PR review comments: - Wrap _run_handle in try/except to match CPython Handle._run() behavior, preventing unhandled exceptions from crashing the event loop - Fix mypy no-any-return in signal.py params_from_payload - Fix mypy attr-defined errors for Handle private attributes - Update tests to verify exceptions are logged rather than propagated Signed-off-by: Tim Li <ltim@uber.com> Made-with: Cursor
CI failed: Code formatting violation in deterministic_event_loop.py: Black detected 1 file that would need reformatting, causing CI check to fail with exit code 1.OverviewA single CI failure was detected across 1 log. The code formatter (Black) identified a formatting violation in FailuresBlack Formatting Check Failed (confidence: high)
Summary
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
|
Gitar fix CI |
Applied Black formatting to |
| def notify_state_changed(self) -> None: | ||
| """Re-evaluate all wait_condition predicates. Resolve those that are now True.""" | ||
| remaining: list[tuple[Callable[[], bool], Any]] = [] | ||
| for predicate, future in self._waiters: | ||
| if future.done(): | ||
| continue | ||
| if predicate(): | ||
| future.set_result(None) | ||
| else: | ||
| remaining.append((predicate, future)) | ||
| self._waiters = remaining |
There was a problem hiding this comment.
⚠️ Edge Case: Predicate exception in notify_state_changed skips remaining waiters
In notify_state_changed, if any predicate() call raises an exception, the loop aborts and all subsequent waiters are never evaluated. This means one misbehaving predicate can permanently block unrelated wait_condition callers.
Wrap each iteration in a try/except so that a failing predicate doesn't prevent other waiters from being resolved.
Suggested fix:
def notify_state_changed(self) -> None:
remaining: list[tuple[Callable[[], bool], Any]] = []
for predicate, future in self._waiters:
if future.done():
continue
try:
if predicate():
future.set_result(None)
else:
remaining.append((predicate, future))
except Exception:
# Keep the waiter so it can be re-evaluated later
remaining.append((predicate, future))
self._waiters = remaining
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| def _deliver_signal( | ||
| self, signal_name: str, payload: Payload, on_applied: Callable[[], None] | ||
| ) -> None: | ||
| self._invoke_signal(signal_name, payload) | ||
| on_applied() |
There was a problem hiding this comment.
💡 Bug: Async signal handlers silently fail instead of failing the task
When an async signal handler is encountered, _invoke_signal raises NotImplementedError. This exception propagates through _deliver_signal → _run_handle, where it's caught and logged but otherwise swallowed. The on_applied() callback (which calls notify_state_changed) is never invoked, so any wait_condition predicates that depended on this signal's side-effects will never be re-evaluated.
Consider either calling on_applied() in a finally block in _deliver_signal, or failing the decision task explicitly so the server can retry.
Suggested fix:
def _deliver_signal(
self, signal_name: str, payload: Payload, on_applied: Callable[[], None]
) -> None:
try:
self._invoke_signal(signal_name, payload)
finally:
on_applied()
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Fixed the Black formatting issue in |
Fixed the Black formatting issue in |
What changed?
Added inbound signal handling. Workflows can now receive signals via @workflow.signal_method and block until a condition is met with workflow.wait_condition(predicate).
Why?
Signaling is a core feature
How did you test it?
unit tests
Potential risks
Release notes
Documentation Changes