contrib/openai_agents: stream model events via Workflow Streams#1497
Open
contrib/openai_agents: stream model events via Workflow Streams#1497
Conversation
Re-applies the openai_agents streaming integration originally split out of PR #1423 on commit 59c7582, updated for the post-PR API: TResponseStreamEvent is a typing-special form, not a class, so the topic stays untyped (default Any) and subscribers pass result_type=TResponseStreamEvent on their own subscribe call. Opt in via `OpenAIAgentsPlugin(model_params=ModelActivityParameters( streaming_event_topic="..."))`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an opt-in streaming integration for temporalio.contrib.openai_agents that publishes raw model stream events to Workflow Streams so workflows can fan out model output to external clients in real time while retaining durability.
Changes:
- Add a new streaming model activity (
invoke_model_activity_streaming) that consumesModel.stream_response()and publishes each rawTResponseStreamEventto a Workflow Streams topic. - Enable workflow-side
Runner.run_streamed()by wiringModel.stream_response()through the Temporal model stub and adding fail-fast validations for required streaming config. - Add docs + integration tests covering opt-in streaming, missing-topic failure, and local-activity incompatibility.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/contrib/openai_agents/test_openai_streaming.py | New integration tests validating streaming behavior and failure modes. |
| temporalio/contrib/openai_agents/_temporal_openai_agents.py | Registers the new streaming activity with the worker. |
| temporalio/contrib/openai_agents/_temporal_model_stub.py | Implements Model.stream_response() by invoking the streaming activity and yielding returned events. |
| temporalio/contrib/openai_agents/_openai_runner.py | Enables workflow-safe run_streamed() with fail-fast validation and exception rewrapping. |
| temporalio/contrib/openai_agents/_model_parameters.py | Adds streaming_topic and streaming_batch_interval configuration. |
| temporalio/contrib/openai_agents/_mcp.py | Removes unused logging. |
| temporalio/contrib/openai_agents/_invoke_model_activity.py | Adds streaming activity + Workflow Streams publishing and refactors tool/handoff reconstruction + OpenAI status handling. |
| temporalio/contrib/openai_agents/README.md | Documents streaming usage and updates feature-support tables. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Drop the implicit 30s fallback for streaming activities and document that heartbeat_timeout should be set lower than start_to_close_timeout so a stuck model call is detected before the overall activity timeout fires.
brianstrauch
approved these changes
May 4, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a streaming integration to
temporalio.contrib.openai_agentsthat publishes rawTResponseStreamEventvalues from the model activity to a Workflow Streams topic, so a workflow can fan model output out to clients in real time without losing durability.OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_topic="...")). With no topic set, behavior is unchanged.Runner.run_streamedand publishes eachTResponseStreamEventchunk through a typed topic handle. Final response and tool-call events are still returned to the workflow as before.TResponseStreamEventis a typing special form rather than a class, so the topic stays untyped (defaultAny); subscribers passresult_type=TResponseStreamEventon their ownsubscribecall.This was originally split out of PR #1423 (Workflow Streams library) and is now rebuilt as a single commit on top of merged main.
Test plan
poe lintclean (ruff, pyright, mypy, basedpyright, pydocstyle)tests/contrib/openai_agents/test_openai_streaming.py— covers streaming opt-in, no-topic fallback, MCP path, tool-call interleaving🤖 Generated with Claude Code