Skip to content

Commit c2db9e2

Browse files
authored
Merge branch 'main' into heartbeat
2 parents c6306bb + 423011f commit c2db9e2

File tree

17 files changed

+1422
-8
lines changed

17 files changed

+1422
-8
lines changed

.github/workflows/ci_checks.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ jobs:
5151
uses: astral-sh/setup-uv@v1
5252
- name: Install dependencies
5353
run: |
54-
uv sync --extra dev
54+
uv sync --extra dev --extra openai
5555
- name: Run mypy type checker
5656
run: |
57-
uv tool run mypy cadence/
57+
uv run mypy cadence/
5858
5959
test:
6060
name: Unit Tests

cadence/_internal/activity/_definition.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
ParamSpec,
1515
TypeVar,
1616
Awaitable,
17+
Type,
1718
cast,
19+
overload,
1820
Concatenate,
1921
)
2022

@@ -119,10 +121,13 @@ def __init__(
119121
super().__init__(name, wrapped, ExecutionStrategy.THREAD_POOL, signature)
120122
update_wrapper(self, wrapped)
121123

122-
def __get__(self, instance, owner):
124+
@overload
125+
def __get__(self, instance: None, owner: Type[T]) -> "SyncMethodImpl[T, P, R]": ...
126+
@overload
127+
def __get__(self, instance: T, owner: Type[T]) -> SyncImpl[P, R]: ...
128+
def __get__(self, instance: T | None, owner: Type[T]) -> "SyncImpl[P, R] | Self":
123129
if instance is None:
124130
return self
125-
# If we bound the method to an instance, then drop the self parameter. It's a normal function again
126131
return SyncImpl[P, R](
127132
partial(self._wrapped, instance), self.name, self._signature
128133
)
@@ -181,10 +186,13 @@ def __init__(
181186
else:
182187
self._is_coroutine = _COROUTINE_MARKER
183188

184-
def __get__(self, instance, owner):
189+
@overload
190+
def __get__(self, instance: None, owner: Type[T]) -> "AsyncMethodImpl[T, P, R]": ...
191+
@overload
192+
def __get__(self, instance: T, owner: Type[T]) -> AsyncImpl[P, R]: ...
193+
def __get__(self, instance: T | None, owner: Type[T]) -> "AsyncImpl[P, R] | Self":
185194
if instance is None:
186195
return self
187-
# If we bound the method to an instance, then drop the self parameter. It's a normal function again
188196
return AsyncImpl[P, R](
189197
partial(self._wrapped, instance), self.name, self._signature
190198
)

cadence/_internal/workflow/workflow_engine.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
1414
from cadence._internal.workflow.workflow_instance import WorkflowInstance
1515
from cadence.api.v1 import history
16-
from cadence.api.v1.common_pb2 import Failure
16+
from cadence.api.v1.common_pb2 import Failure, WorkflowType
1717
from cadence.api.v1.decision_pb2 import (
1818
Decision,
1919
FailWorkflowExecutionDecisionAttributes,
2020
CompleteWorkflowExecutionDecisionAttributes,
21+
ContinueAsNewWorkflowExecutionDecisionAttributes,
2122
)
2223
from cadence.api.v1.history_pb2 import (
2324
HistoryEvent,
2425
WorkflowExecutionStartedEventAttributes,
2526
)
27+
from cadence.api.v1.tasklist_pb2 import TaskList
28+
from cadence.error import ContinueAsNewError
2629
from cadence.workflow import WorkflowDefinition, WorkflowInfo
2730

2831
logger = logging.getLogger(__name__)
@@ -187,6 +190,25 @@ def _maybe_complete_workflow(self) -> Optional[Decision]:
187190
return None
188191
except (CancelledError, InvalidStateError, FatalDecisionError):
189192
raise
193+
except ContinueAsNewError as e:
194+
# Use execution's workflow type and task list when not overridden
195+
info = self._context.info()
196+
attrs = ContinueAsNewWorkflowExecutionDecisionAttributes(
197+
workflow_type=WorkflowType(name=e.workflow_type or info.workflow_type),
198+
task_list=TaskList(name=e.task_list or info.workflow_task_list),
199+
input=info.data_converter.to_data(list(e.workflow_args)),
200+
)
201+
if e.execution_start_to_close_timeout is not None:
202+
attrs.execution_start_to_close_timeout.FromTimedelta(
203+
e.execution_start_to_close_timeout
204+
)
205+
if e.task_start_to_close_timeout is not None:
206+
attrs.task_start_to_close_timeout.FromTimedelta(
207+
e.task_start_to_close_timeout
208+
)
209+
return Decision(
210+
continue_as_new_workflow_execution_decision_attributes=attrs,
211+
)
190212
except ExceptionGroup as e:
191213
if e.subgroup((InvalidStateError, FatalDecisionError)):
192214
raise

cadence/contrib/__init__.py

Whitespace-only changes.

cadence/contrib/openai/README.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# OpenAI Agents SDK Integration with Cadence
2+
3+
We welcome contributions. Join [CNCF Slack Workspace](https://communityinviter.com/apps/cloud-native/cncf) and contact us in [#cadence-users](https://cloud-native.slack.com/archives/C09J2FQ7XU3).
4+
5+
## Durable Agent Execution
6+
7+
The module integrates with [OpenAI Agents SDK](https://github.com/openai/openai-agents-python) to allow users to run OpenAI agents in a durable way (retries, reset to a check point).
8+
9+
Currently, it only supports agents with function tools.
10+
11+
**Requirement** Python version 3.12+ is required.
12+
13+
## Example: Booking flight Agent
14+
15+
The example below demonstrates how to run a single agent with one tool. See full code example [here](https://gist.github.com/shijiesheng/a06b6b58e44a9a09555bd3140370700e)
16+
17+
### Step 1: Write Agent as Cadence Workflow, tools as Cadence Activity (book_flight_agent.py)
18+
19+
```python
20+
import cadence
21+
from agents import Agent, function_tool, Runner, RunConfig
22+
from cadence.contrib.openai import OpenAIActivities
23+
from datetime import datetime
24+
from dataclasses import dataclass
25+
26+
cadence_registry = cadence.Registry()
27+
cadence_registry.register_activities(OpenAIActivities())
28+
29+
@cadence_registry.workflow(name="BookFlightAgentWorkflow")
30+
class BookFlightAgentWorkflow:
31+
32+
@cadence.workflow.run
33+
async def run(self, input: str) -> str:
34+
35+
# define agent using OpenAI SDK as usual
36+
agent =Agent(
37+
name = "Book Flight Agent",
38+
model = "gpt-4o-mini",
39+
tools = [
40+
function_tool(book_flight),
41+
],
42+
)
43+
result = await Runner.run(agent, input, run_config=RunConfig(
44+
tracing_disabled=True,
45+
))
46+
return result.final_output
47+
48+
@dataclass
49+
class Flight:
50+
from_city: str
51+
to_city: str
52+
departure_date: datetime
53+
return_date: datetime
54+
price: float
55+
airline: str
56+
flight_number: str
57+
seat_number: str
58+
59+
@cadence_registry.activity(name="book_flight")
60+
async def book_flight(from_city: str, to_city: str, departure_date: datetime, return_date: datetime) -> Flight:
61+
"""
62+
Book a flight tool: a pure mock for demo purposes
63+
"""
64+
return Flight(from_city=from_city, to_city=to_city, departure_date=departure_date, return_date=return_date, price=100, airline="United", flight_number="123456", seat_number="12A")
65+
```
66+
67+
### Step 2: Start Cadence Server
68+
69+
```sh
70+
curl https://raw.githubusercontent.com/cadence-workflow/cadence/master/docker/docker-compose.yml | docker compose -f - up -d
71+
```
72+
73+
### Step 3: Trigger the Agent Run
74+
```python
75+
# in main.py
76+
77+
import asyncio
78+
import cadence
79+
from datetime import timedelta
80+
from cadence.api.v1.history_pb2 import EventFilterType
81+
from cadence.contrib.openai import PydanticDataConverter
82+
from book_flight_agent import cadence_registry
83+
from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryRequest
84+
85+
async def main():
86+
# start Cadence worker
87+
worker = cadence.worker.Worker(
88+
cadence.Client(
89+
domain="default",
90+
target="localhost:7833",
91+
data_converter=PydanticDataConverter(),
92+
),
93+
"agent-task-list",
94+
cadence_registry,
95+
)
96+
97+
# start BookFlightAgentWorkflow
98+
async with worker:
99+
execution =await worker.client.start_workflow(
100+
"BookFlightAgentWorkflow",
101+
"Book a flight from New York to London on March 20th 2026 at 10:00 AM and return on March 25th, 2026 at 10:00 AM",
102+
task_list=worker.task_list,
103+
execution_start_to_close_timeout=timedelta(minutes=10),
104+
)
105+
106+
print(f"cadence workflow started: http://localhost:8088/domains/default/cluster0/workflows/{execution.workflow_id}/{execution.run_id}/summary")
107+
108+
await worker.client.workflow_stub.GetWorkflowExecutionHistory(
109+
GetWorkflowExecutionHistoryRequest(
110+
domain=worker.client.domain,
111+
workflow_execution=execution,
112+
wait_for_new_event=True,
113+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
114+
skip_archival=True,
115+
)
116+
)
117+
118+
if __name__ == "__main__":
119+
asyncio.run(main())
120+
```
121+
122+
### Step 4: See Agent Run in Cadence Web
123+
124+
![Screenshot of Agent Run in Cadence Web](images/cadence-web-agent-run.jpg)

cadence/contrib/openai/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import sys
2+
3+
if sys.version_info < (3, 12):
4+
raise ImportError(
5+
"The 'openai' module requires Python 3.12 or higher due to its dependency on inspect.markcoroutinefunction"
6+
)
7+
8+
from .openai_activities import OpenAIActivities # type: ignore[unreachable]
9+
from .cadence_agent_runner import CadenceAgentRunner as _CadenceAgentRunner # noqa: F401 — imported for side effect
10+
from .pydantic_data_converter import PydanticDataConverter
11+
12+
__all__ = [
13+
"PydanticDataConverter",
14+
"OpenAIActivities",
15+
]
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import dataclasses
2+
import sys
3+
from typing import Any, Unpack
4+
from typing_extensions import override
5+
from agents import (
6+
Agent,
7+
Handoff,
8+
RunConfig,
9+
RunResult,
10+
RunResultStreaming,
11+
RunState,
12+
TContext,
13+
TResponseInputItem,
14+
)
15+
from agents.run import (
16+
DEFAULT_MAX_TURNS,
17+
AgentRunner,
18+
RunOptions,
19+
set_default_agent_runner,
20+
)
21+
from cadence.contrib.openai.cadence_model import CadenceModel
22+
23+
# TraceCtxManager assumes that the model invocation is running in the same context.
24+
# This is no longer true with Cadence workflows, where each invocation is running in a separate coroutine.
25+
# Thus, suppress the error
26+
_original_unraisablehook = sys.unraisablehook
27+
28+
29+
def _suppress_ctx_error_hook(args) -> None:
30+
if isinstance(args.exc_value, ValueError) and (
31+
isinstance(args.exc_value, ValueError)
32+
and "was created in a different Context" in str(args.exc_value)
33+
):
34+
return
35+
_original_unraisablehook(args)
36+
37+
38+
sys.unraisablehook = _suppress_ctx_error_hook
39+
40+
41+
def _replace_model_in_agent(
42+
agent: Agent[Any],
43+
) -> Agent[Any]:
44+
if isinstance(agent.model, CadenceModel):
45+
return agent
46+
47+
name = agent.model
48+
if not isinstance(name, str):
49+
raise ValueError("Model name must be a string")
50+
51+
agent.model = CadenceModel(model_name=name)
52+
53+
for i, handoff in enumerate(agent.handoffs):
54+
if isinstance(handoff, Agent):
55+
agent.handoffs[i] = _replace_model_in_agent(handoff)
56+
elif isinstance(handoff, Handoff):
57+
raise ValueError("Handoff is not yet supported")
58+
else:
59+
raise TypeError(f"Unknown handoff type: {type(handoff)}")
60+
61+
return agent
62+
63+
64+
class CadenceAgentRunner(AgentRunner):
65+
@override
66+
async def run(
67+
self,
68+
starting_agent: Agent[TContext],
69+
input: str | list[TResponseInputItem] | RunState[TContext],
70+
**kwargs: Unpack[RunOptions[TContext]],
71+
) -> RunResult:
72+
73+
context = kwargs.get("context")
74+
max_turns = kwargs.get("max_turns", DEFAULT_MAX_TURNS)
75+
hooks = kwargs.get("hooks")
76+
run_config = kwargs.get("run_config")
77+
error_handlers = kwargs.get("error_handlers")
78+
previous_response_id = kwargs.get("previous_response_id")
79+
auto_previous_response_id = kwargs.get("auto_previous_response_id", False)
80+
conversation_id = kwargs.get("conversation_id")
81+
session = kwargs.get("session")
82+
83+
# if starting_agent.tools:
84+
# raise ValueError(f"Tools are not yet supported")
85+
86+
if starting_agent.mcp_servers:
87+
raise ValueError("MCP servers are not yet supported")
88+
89+
if not run_config:
90+
run_config = RunConfig()
91+
92+
if run_config.model:
93+
if not isinstance(run_config.model, str):
94+
raise ValueError("Model must be a string")
95+
run_config = dataclasses.replace(
96+
run_config,
97+
model=CadenceModel(run_config.model),
98+
)
99+
100+
return await super().run(
101+
starting_agent=_replace_model_in_agent(starting_agent),
102+
input=input,
103+
context=context,
104+
max_turns=max_turns,
105+
hooks=hooks,
106+
run_config=run_config,
107+
error_handlers=error_handlers,
108+
previous_response_id=previous_response_id,
109+
auto_previous_response_id=auto_previous_response_id,
110+
conversation_id=conversation_id,
111+
session=session,
112+
)
113+
114+
@override
115+
def run_sync(
116+
self,
117+
starting_agent: Agent[TContext],
118+
input: str | list[TResponseInputItem] | RunState[TContext],
119+
**kwargs: Unpack[RunOptions[TContext]],
120+
) -> RunResult:
121+
raise RuntimeError("Model run_sync is not yet supported.")
122+
123+
@override
124+
def run_streamed(
125+
self,
126+
starting_agent: Agent[TContext],
127+
input: str | list[TResponseInputItem] | RunState[TContext],
128+
**kwargs: Unpack[RunOptions[TContext]],
129+
) -> RunResultStreaming:
130+
raise RuntimeError("Model run_streamed is not yet supported.")
131+
132+
133+
set_default_agent_runner(CadenceAgentRunner())

0 commit comments

Comments
 (0)