Skip to content

Commit 5f563fc

Browse files
jssmithclaude
andcommitted
contrib: openai_agents streaming integration
Re-applies the openai_agents streaming integration originally split out of PR #1423 on commit 59c7582, updated for the post-PR API: TResponseStreamEvent is a typing-special form, not a class, so the topic stays untyped (default Any) and subscribers pass result_type=TResponseStreamEvent on their own subscribe call. Opt in via `OpenAIAgentsPlugin(model_params=ModelActivityParameters( streaming_event_topic="..."))`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 587f638 commit 5f563fc

8 files changed

Lines changed: 874 additions & 178 deletions

File tree

temporalio/contrib/openai_agents/README.md

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -576,10 +576,82 @@ result = await Runner.run(
576576
)
577577
```
578578

579+
## Streaming
580+
581+
⚠️ **Experimental** - This functionality is subject to change prior to General Availability.
582+
583+
The integration supports streaming model responses via the SDK-native
584+
`Runner.run_streamed` API. Inside a workflow, model calls execute as a
585+
streaming activity (`invoke_model_activity_streaming`) that consumes
586+
`Model.stream_response` and returns the collected list of native OpenAI
587+
response events. The workflow surfaces those events to the caller
588+
through `RunResultStreaming.stream_events()`, which wraps them in the
589+
agents-SDK `StreamEvent` union (so raw model events arrive as
590+
`RawResponsesStreamEvent.data`).
591+
592+
External consumers (UIs, tracing pipelines, etc.) observe events as
593+
they arrive by hosting a [`WorkflowStream`](../workflow_streams/README.md)
594+
in the workflow and subscribing with `WorkflowStreamClient`. The
595+
streaming activity publishes each event to the topic configured on
596+
`ModelActivityParameters.streaming_event_topic`. The topic is required
597+
when using `Runner.run_streamed`; calling it without a configured topic
598+
raises before any activity is scheduled.
599+
600+
Example workflow consuming events via `stream_events()` while the
601+
streaming activity publishes to the `"events"` topic:
602+
603+
```python
604+
from agents import Agent, Runner
605+
from agents.stream_events import RawResponsesStreamEvent
606+
607+
from temporalio import workflow
608+
609+
@workflow.defn
610+
class MyAgent:
611+
@workflow.run
612+
async def run(self, prompt: str) -> str:
613+
agent = Agent(name="Assistant", instructions="...")
614+
result = Runner.run_streamed(agent, prompt)
615+
async for event in result.stream_events():
616+
if isinstance(event, RawResponsesStreamEvent):
617+
raw_event = event.data # native OpenAI ResponseStreamEvent
618+
...
619+
return result.final_output
620+
```
621+
622+
To publish raw model events to external subscribers, host a
623+
`WorkflowStream` in the workflow and configure
624+
`OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic="events"))`. See [`temporalio.contrib.workflow_streams`](../workflow_streams/README.md) for the
625+
publisher and subscriber API.
626+
627+
`RunResultStreaming.stream_events()` yields the agents-SDK
628+
`StreamEvent` union (`RawResponsesStreamEvent`, `RunItemStreamEvent`,
629+
`AgentUpdatedStreamEvent`); native OpenAI response events arrive
630+
wrapped as `RawResponsesStreamEvent.data`. Workflow-stream subscribers,
631+
by contrast, receive the unwrapped native events directly because the
632+
streaming activity publishes them straight from `Model.stream_response`.
633+
634+
Streaming is incompatible with `use_local_activity` because local
635+
activities support neither activity heartbeats nor the workflow stream
636+
signal channel.
637+
638+
Activity retries surface to workflow-stream subscribers but not to
639+
`RunResultStreaming.stream_events()`. Events are published to the
640+
stream as `Model.stream_response` produces them, so a partial attempt
641+
that fails mid-response leaves its emitted events on the stream and the
642+
retry attempt publishes a second sequence. `stream_events()` only sees
643+
the final successful attempt's collected events because it consumes the
644+
activity's return value. Workflow-stream subscribers should treat
645+
retries the same way as any other workflow_streams publisher — see
646+
[Delivery semantics](../workflow_streams/README.md) for the trade and
647+
the conventional `RETRY` event pattern for surfacing the transition to
648+
consumers.
649+
579650
## Feature Support
580651

581652
This integration is presently subject to certain limitations.
582-
Streaming and voice agents are not supported.
653+
Realtime agents are not supported. Streaming is supported via
654+
`Runner.run_streamed` — see [Streaming](#streaming) above.
583655
Certain tools are not suitable for a distributed computing environment, so these have been disabled as well.
584656

585657
### Model Providers
@@ -591,12 +663,10 @@ Certain tools are not suitable for a distributed computing environment, so these
591663

592664
### Model Response format
593665

594-
This integration does not presently support streaming.
595-
596-
| Model Response | Supported |
597-
| :------------- | :-------: |
598-
| Get Response | Yes |
599-
| Streaming | No |
666+
| Model Response | Supported |
667+
| :------------- | :------------------: |
668+
| Get Response | Yes |
669+
| Streaming | Yes (experimental) |
600670

601671
### Tools
602672

@@ -900,10 +970,15 @@ If OTEL instrumentation is not enabled, the integration works normally without a
900970

901971
### Voice
902972

903-
| Mode | Supported |
904-
| :----------------------- | :-------: |
905-
| Voice agents (pipelines) | No |
906-
| Realtime agents | No |
973+
| Mode | Supported |
974+
| :----------------------- | :-----------: |
975+
| Voice agents (pipelines) | Yes [^voice] |
976+
| Realtime agents | No |
977+
978+
[^voice]: `VoicePipeline` runs in your process and delegates the agent
979+
step (`VoiceWorkflowBase.run`) to a Temporal workflow that uses
980+
`Runner.run` or `Runner.run_streamed`. STT and TTS run outside
981+
Temporal; the agent loop is durable.
907982

908983
### Utilities
909984

0 commit comments

Comments
 (0)