forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcadence_model.py
More file actions
71 lines (66 loc) · 2.36 KB
/
cadence_model.py
File metadata and controls
71 lines (66 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import AsyncIterator
from agents import (
Model,
ModelSettings,
TResponseInputItem,
Tool,
AgentOutputSchemaBase,
Handoff,
ModelTracing,
ModelResponse,
)
from agents.items import TResponseStreamEvent
from openai.types.responses import ResponsePromptParam
from cadence.contrib.openai.cadence_tool import to_cadence_tool
from cadence.contrib.openai.cadence_handoff import to_cadence_handoff
from cadence.contrib.openai.openai_activities import OpenAIActivities
class CadenceModel(Model):
def __init__(self, model_name: str):
self._model_name = model_name
self._openai_activities = OpenAIActivities()
async def get_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchemaBase | None,
handoffs: list[Handoff],
tracing: ModelTracing,
*,
previous_response_id: str | None,
conversation_id: str | None,
prompt: ResponsePromptParam | None,
) -> ModelResponse:
"""
run model inside cadence activity
"""
# cast needed: mypy can't infer R through ParamSpec with complex OpenAI union types
return await self._openai_activities.invoke_model(
model_name=self._model_name,
system_instructions=system_instructions,
input=input,
model_settings=model_settings,
tools=[to_cadence_tool(tool) for tool in tools],
output_schema=output_schema,
handoffs=[to_cadence_handoff(h) for h in handoffs],
tracing=tracing,
previous_response_id=previous_response_id,
conversation_id=conversation_id,
prompt=prompt,
)
def stream_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchemaBase | None,
handoffs: list[Handoff],
tracing: ModelTracing,
*,
previous_response_id: str | None,
conversation_id: str | None,
prompt: ResponsePromptParam | None,
) -> AsyncIterator[TResponseStreamEvent]:
raise RuntimeError("Model stream_response is not yet supported.")