Skip to content

Commit 118083b

Browse files
jssmithclaudebrianstrauch
authored
Add workflow_streams samples (#300)
* Add workflow_streams samples: order_workflow scenario Initial samples directory for temporalio.contrib.workflow_streams, the workflow-hosted durable event stream contrib (experimental, contrib/pubsub branch of sdk-python). The order_workflow scenario covers the basic publisher path: a workflow binds a typed topic in @workflow.init, an activity publishes events via the topic handle, and a starter subscribes with WorkflowStreamClient and prints events as they arrive. Also enables the uv supply-chain cooldown options in the lockfile. * samples: workflow_stream: add reconnecting-subscriber scenario Adds a second scenario demonstrating the central Workflow Streams use case: a consumer disconnects mid-stream and resumes later via subscribe(from_offset=...), with no events lost or duplicated. The existing OrderWorkflow finishes too quickly to make the pattern visible, so this introduces a multi-stage PipelineWorkflow paced with workflow.sleep between stages. The runner reads a couple of events, persists item.offset + 1 to a temp file, sleeps "disconnected" while the workflow keeps publishing, then opens a fresh Client + WorkflowStreamClient and resumes from the persisted offset — the same shape that works across actual process restarts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * samples: workflow_stream: add external-publisher scenario Adds a third scenario covering the third publisher shape: a backend service or scheduled job pushing events into a workflow it didn't itself start. The earlier scenarios publish either from inside the workflow or from one of its activities; this one uses WorkflowStreamClient.create() externally. HubWorkflow is a passive stream host — it does no work of its own and just waits to be told to close, fitting the event-bus pattern. The runner publishes a series of news headlines, runs a subscriber task alongside, signals close, and exits when both tasks complete. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * samples: workflow_stream: add truncating-ticker scenario Adds a fourth scenario for long-running workflows that need to bound their event log: the workflow publishes events at a fixed cadence and calls self.stream.truncate(...) periodically to keep only the most recent entries. The runner subscribes twice — fast and slow — to make the trade visible: the fast subscriber sees every offset in order; the slow one falls behind a truncation, has its iterator transparently jump forward to the new base offset, and shows the offset gap that intermediate events fell into. This is the model for high-volume long-running streams: bounded log size, slow consumers may miss intermediate events but always see the most recent state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * samples: rename workflow_stream → workflow_streams; migrate to topic handles - Directory and module path renamed to plural to match sdk-python `temporalio.contrib.workflow_streams` rename. - Workflow-side: bind a typed topic handle in `@workflow.init` and call `topic.publish(value)` — the removed `WorkflowStream.publish` form is gone. Same change applied to the activity and external-publisher. - Activity: `WorkflowStreamClient.from_activity()` → `from_within_activity()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * samples: workflow_streams review polish - README: fix scenario count (two -> four), document subscriber start position and continue-as-new semantics for stream_state - hub_workflow: drop stale comment referencing a README race note that does not exist in this sample - payment_activity: trim long publisher_id/dedup caveat — moved out of the first sample's docstring to keep it approachable * workflow_streams: deliver terminal events + fix run_publisher subscribe shape End-to-end runs of the four workflow_streams scenarios surfaced two sample-side issues, both fixed here. run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and called ``payload_converter.from_payload(item.data, T)``. The contrib's ``subscribe()`` defaults to converter-decoded data, not raw payloads, so this assertion fired on the first run. Switch to ``result_type=RawValue`` (the documented escape hatch for heterogeneous topics) and read ``item.data.payload``. Items published in the same workflow task that returns from ``@workflow.run`` were not delivered to subscribers — the in-memory log dies with the workflow and the next subscriber poll lands on a completed workflow. Fix: each scenario now uses an in-band terminator that subscribers break on, and each workflow holds the run open with ``await workflow.sleep(timedelta(milliseconds=500))`` so that final publish is fetched before the workflow exits: - OrderWorkflow / PipelineWorkflow: the workflow's own ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")`` is the terminator (consumers already broke on it). - HubWorkflow: the *publisher* in run_external_publisher emits a sentinel ``NewsEvent(headline="__done__")`` immediately before signaling close; the consumer breaks on the sentinel. - TickerWorkflow: the final tick (n == count - 1) is the terminator; ``keep_last`` guarantees that offset survives the last truncation, so even slow consumers reach it. Because subscribers stop polling on the terminator, by the time ``workflow.run`` returns there are no in-flight poll handlers — no ``UnfinishedUpdateHandlersWarning`` from the SDK and no need for ``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in the workflow exit path. Two consecutive end-to-end runs of all four scenarios pass cleanly against ``temporal server start-dev --headless``. * workflow_streams README: document the stream-end pattern Subscribers don't exit on their own when the host workflow completes — they need an in-band terminator, and the workflow needs to hold open briefly so the final publish is fetched before run() returns. Both pieces show up in every scenario here, so document them in one place and update scenario 3's description to mention the sentinel headline the publisher emits. * samples: workflow_streams: README and wheel packages cleanup Now that temporalio 1.27.0 has shipped (and main has bumped to it in #302), drop the README's "install sdk-python from a branch" callout and point at >=1.27.0 instead. Also add workflow_streams to the wheel packages list alongside the other samples. * samples: workflow_streams: drop force_flush=True from charge_card The activity's final publish was using force_flush=True, which sets the flush_event so the background flusher fires immediately. Triggering a flush right before __aexit__ runs the activity into the WorkflowStreamClient's cancel-mid-flush path: __aexit__ cancels the flusher task while it's awaiting the publish signal RPC, the cancel propagates into the in-flight signal, and the activity hangs until the StartToClose timeout fires. Empirically the workflow then retries the activity indefinitely. Without force_flush=True the buffered "card charged" event flushes via the regular 200ms batch interval and the flusher is sleeping in wait_for(...) when __aexit__ cancels it — a clean cancellation path. The user-visible publish ordering is unchanged. The underlying SDK bug should be fixed separately by switching __aexit__ from cancel() to a cooperative-stop flag so the in-flight signal completes before the flusher exits. * samples: workflow_streams: drop temp-file resume offset; add stats column The reconnecting-subscriber demo previously persisted its resume offset to a temp file between phases. Inside one process that's theatrical: the disconnect/reconnect shape comes from creating a fresh Client + WorkflowStreamClient with from_offset=N, not from where N happens to be stored. Replace the file with a local int and a comment about durable storage in production (a DB row keyed by user_id/run_id, etc.). Restructure output around a stats column so the demo conveys what's happening to the stream at all times, not just between phases. A background poller calls WorkflowStreamClient.get_offset() throughout and emits a heartbeat line once a second; every emit prints current proc/avail/pend in a left column followed by the phase or event message. Watching pend grow during the disconnect window and shrink again as phase 2 catches up is the demo's core point. * samples: workflow_streams: surface multiple truncation jumps in ticker The truncating-ticker demo is meant to make the bounded-log trade visible: fast subscriber sees every event, slow subscriber loses intermediate ones to truncation. The previous parameters (truncate_every=5, keep_last=3, interval_ms=400, slow_delay=1.5s) produced at most one tiny jump near the end of the run — easy to miss. Tighter parameters (truncate_every=2, keep_last=1, interval_ms=200, count=30) keep the workflow log at one or two entries between truncations. That shrinks the slow subscriber's per-poll batch, so it re-polls more often, and most polls land after a truncation that has passed its position. The result is several visible jumps over the demo, not a single batched one at the end. Switch the output to two lanes (fast on the left, slow on the right with explicit "↪ jumped offset=N → M (K dropped)" markers) so the divergence reads at a glance instead of being lost in interleaved single-stream output. Also extend the docstring to call out the opposite trade — never truncating means slow consumers eventually catch up at the cost of unbounded workflow history — so readers know when this pattern is the wrong fit. * samples: workflow_streams: add LLM-streaming scenario Adds a fifth scenario to workflow_streams/ that streams an OpenAI chat completion to the terminal through a Workflow Stream. Activity is the publisher (it owns the non-deterministic API call), workflow hosts the stream and runs the activity, runner subscribes and renders to stdout as deltas arrive. Layout: * `chat_shared.py` — types and topics for this scenario, kept out of the cross-scenario `shared.py` because no other scenario uses them * `workflows/chat_workflow.py` — `ChatWorkflow` runs `stream_completion` with `RetryPolicy(maximum_attempts=3)` and the same 500ms hold-open pattern the other four samples use * `activities/chat_activity.py` — `stream_completion` calls `AsyncOpenAI(...).chat.completions.create(stream=True)` with `gpt-5-mini`, publishes each token chunk on the `delta` topic, the full text on `complete`, and a `RetryEvent` on `retry` when running on attempt > 1. `force_flush=True` is intentionally omitted to avoid the `__aexit__` cancel-mid-flight hang in `temporalio.contrib.workflow_streams` 1.27.0; the 200ms `batch_interval` is fast enough for an interactive feel. * `run_chat.py` — subscribes to all three topics, prints deltas to stdout as they stream, and on a retry event uses plain ANSI escapes (`\033[<n>A`, `\033[J`) to rewind the rendered output before the retried attempt re-publishes * `run_chat_worker.py` — runs on its own task queue (`workflow-stream-chat-task-queue`), registering only `ChatWorkflow` and `stream_completion`; the openai dependency and the `OPENAI_API_KEY` requirement stay isolated to this one scenario The split worker also makes the retry-handling demo trivial to run: the user kills the chat worker mid-stream, brings it back up, and the activity retries — no synthetic failure injection needed. Adds `chat-stream = ["openai>=1.0,<2"]` as a new optional dependency group; `uv sync --group chat-stream` and an `OPENAI_API_KEY` are documented in the README. * samples: workflow_streams: drop chat-stream openai upper cap openai-agents (the existing langsmith-tracing / openai-agents extras) already pulls openai>=2.26.0. Capping chat-stream at openai<2 made the two extras unsatisfiable together. Drop the cap; the chat activity uses APIs that are stable across openai 1.x and 2.x. * samples: workflow_streams: chat consumer header + cursor save/restore Two display fixes for run_chat.py: 1. Print a header line right after start_workflow so the user sees immediate feedback ("[chat <id>] streaming response from gpt-5-mini, awaiting first token...") instead of a blank screen until the first delta arrives. 2. Replace the newline-counting ANSI clear with cursor save/restore (\033[s / \033[u\033[J). The previous version counted text newlines to decide how far up to move the cursor on retry, which undercounts when the terminal has wrapped long lines — the failed attempt's first wrapped lines stayed on screen above the retry marker. save/restore rewinds to a fixed position regardless of wrapping. Bumps the prompt to a 500-word distributed-systems comparison (Paxos vs Raft vs Viewstamped Replication) so there is enough output to comfortably kill the worker mid-stream and watch the retried attempt re-render from scratch. * samples: workflow_streams: rename chat -> llm in scenario 5 "Chat" implies multi-turn conversation. The new scenario is a one-shot LLM completion stream, not a chat. Rename to make the scope clear: - chat_shared.py -> llm_shared.py - workflows/chat_workflow.py -> workflows/llm_workflow.py - activities/chat_activity.py -> activities/llm_activity.py - run_chat.py -> run_llm.py - run_chat_worker.py -> run_llm_worker.py - ChatInput / ChatWorkflow -> LLMInput / LLMWorkflow - CHAT_TASK_QUEUE -> LLM_TASK_QUEUE ("workflow-stream-chat-task-queue" -> "workflow-stream-llm-task-queue") - chat-stream extra -> llm-stream - workflow id prefix workflow-stream-chat-... -> workflow-stream-llm-... The activity's `stream_completion` defn name and the topic constants (`delta`, `complete`, `retry`) stay the same — those already describe what they do without the "chat" framing. README, docstrings, and run instructions updated to match. * samples: workflow_streams: race the LLM consumer with workflow result If the LLM activity exhausts its retries (bad OPENAI_API_KEY, provider outage, etc.), the workflow fails before the activity publishes the `complete` terminator. The consumer's previous async-for loop only exited on `complete`, so the script blocked indefinitely on a terminator that would never arrive instead of surfacing the workflow failure. Wrap the subscriber in a `consume()` coroutine and run it through the existing `race_with_workflow` helper (the same pattern `run_publisher.py` uses): if the workflow finishes first the subscriber gets cancelled and the workflow's exception propagates; if the subscriber sees `complete` first, the helper waits for the workflow result and returns it. Found in a Codex code review of today's workflow_streams changes. * samples: workflow_streams: drop race_with_workflow helper The helper wrapped the consumer in an asyncio.gather that cancelled the subscriber when the workflow result settled — defensive logic for a case the SDK already handles. WorkflowStreamClient.subscribe() exits cleanly on every workflow terminal state (return, continue-as-new, failure) via its AcceptedUpdateCompletedWorkflow, WorkflowUpdateRPCTimeoutOrCancelledError, and NOT_FOUND branches in sdk-python. The async-for loop ends naturally when the workflow terminates without a publish, so we don't need a separate task to race against handle.result(). Replace the helper with the obvious shape in both runners: async for item in stream.subscribe(...): ... if item.is_terminator: break result = await handle.result() # raises on workflow failure Either path reaches handle.result(): an explicit break on the in-band terminator (workflow still running, hold-open lets the poll deliver the event), or the iterator naturally exhausting when the workflow has already terminated. handle.result() then either returns or raises the workflow's failure — covering the LLM "activity exhausted retries" case that prompted the helper to be added in the first place. Smoke tested: uv run workflow_streams/run_publisher.py uv run workflow_streams/run_llm.py * samples: workflow_streams: reorganize README; drop closing section Two fixes: 1. Reorganize so the README doesn't jump back and forth between scenarios. The previous shape introduced 1-4, then put scenario 5's full description plus its setup and run instructions inline, then jumped back to a "Run it" section that only covered 1-4. New shape: all five scenarios up front (parallel structure), one unified "Run it" section that covers worker setup for both groups and all five runner scripts in one block, then expected output, then notes. 2. Drop the inline "Ending the stream" section. The same material is in documentation/docs/develop/python/libraries/workflow-streams.mdx under the "Closing the stream" anchor, so the README links there from the Notes block instead of duplicating the explanation. The scenario 5 "split-out worker" rationale (extra dependency, secret, retry-via-Ctrl-C) collapses to a single sentence at the end of its bullet block. * samples: workflow_streams: drop README Notes section The Notes block (subscriber start position, continue-as-new, closing the stream) was a small docs summary tacked onto the end of the README. The samples themselves cover these points: docstrings in each runner / workflow / activity explain the from_offset behavior, the stream_state field, and the in-band terminator + hold-open pattern. Readers who want the full conceptual treatment go to the docs page; the README sticks to "what the scenarios are and how to run them". * samples: workflow_streams: lock llm-stream dependency group The llm-stream dependency group was introduced in pyproject.toml without a corresponding uv.lock update, so `uv sync --frozen --group llm-stream` would fail or force a relock before scenario 5 could run. Add the two missing entries (the package-optional-dependencies list and the package-metadata requires-dev list) so frozen installs work against the committed lock. Found in a Codex review of the day's workflow_streams changes. * samples: workflow_streams: fix lint failures (ruff isort + format) CI's `poe lint` step was failing on three small things across four files: * `run_external_publisher.py`, `ticker_workflow.py`: ruff isort (`I001`) wanted the `workflow_streams.shared` imports re-sorted and a stray blank line removed. Apply the auto-fix. * `run_external_publisher.py`, `run_reconnecting_subscriber.py`, `run_truncating_ticker.py`: ruff format wanted three line-wrapped function calls collapsed back to single lines. Apply the formatter. * `run_truncating_ticker.py`: the formatter joined an adjacent pair of f-strings into an awkward `f"..." f"..."` one-liner. Consolidate them into a single f-string for readability — the resulting line is comfortably under the 88-char limit. `poe lint` (ruff isort + ruff format --check + mypy --all-groups --check-untyped-defs) now passes locally. * samples: workflow_streams: drop BFF jargon and Expected output block Two README/comment cleanups: * "BFF" (backend-for-frontend) is not a widely-known term outside certain front-end-architecture circles. Replace with the more obvious "web backends" in the README intro and "production web backend" in the run_reconnecting_subscriber.py comment about where the resume offset would live durably. * Drop the "Expected output" section. It only covered scenarios 1 and 2; with five scenarios it is no longer pulling its weight. Anyone running the script can see the output for themselves. * Apply suggestion from @brianstrauch * Apply suggestion from @brianstrauch --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Brian Strauch <brian@brianstrauch.com>
1 parent cd2a8c5 commit 118083b

23 files changed

Lines changed: 1259 additions & 0 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7979
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
8080
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
8181
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
82+
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental**
8283
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
8384
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
8485
* [sentry](sentry) - Report errors to Sentry.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ openai-agents = [
4848
pydantic-converter = ["pydantic>=2.10.6,<3"]
4949
sentry = ["sentry-sdk>=2.13.0"]
5050
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
51+
llm-stream = ["openai>=1.0"]
5152
cloud-export-to-parquet = [
5253
"pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'",
5354
"numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'",
@@ -94,6 +95,7 @@ packages = [
9495
"updatable_timer",
9596
"worker_specific_task_queues",
9697
"worker_versioning",
98+
"workflow_streams",
9799
]
98100

99101
[tool.hatch.build.targets.wheel.sources]

uv.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

workflow_streams/README.md

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Workflow Streams
2+
3+
> **Experimental.** These samples use
4+
> `temporalio.contrib.workflow_streams`, which ships in
5+
> `temporalio>=1.27.0`. The module is considered experimental and its
6+
> API may change in future versions.
7+
8+
`temporalio.contrib.workflow_streams` lets a workflow host a durable,
9+
offset-addressed event channel. The workflow holds an append-only log;
10+
external clients (activities, starters, web backends) publish to topics via
11+
signals and subscribe via long-poll updates. This packages the
12+
boilerplate — batching, offset tracking, topic filtering,
13+
continue-as-new hand-off — into a reusable stream.
14+
15+
This directory has five scenarios. The first four share one worker;
16+
the fifth has its own worker because it needs the `openai` package
17+
and an `OPENAI_API_KEY`.
18+
19+
**Scenario 1 — basic publish/subscribe with heterogeneous topics:**
20+
21+
* `workflows/order_workflow.py` — a workflow that hosts a
22+
`WorkflowStream` and publishes status events as it processes an order.
23+
* `activities/payment_activity.py` — an activity that publishes
24+
intermediate progress to the stream via
25+
`WorkflowStreamClient.from_within_activity()`.
26+
* `run_publisher.py` — starts the workflow, subscribes to both topics,
27+
decodes each by `item.topic`, and prints events as they arrive.
28+
29+
**Scenario 2 — reconnecting subscriber:**
30+
31+
* `workflows/pipeline_workflow.py` — a multi-stage pipeline that
32+
publishes stage transitions over ~10 seconds, leaving room for a
33+
consumer to disconnect and reconnect mid-run.
34+
* `run_reconnecting_subscriber.py` — connects, reads a couple of
35+
events, "disconnects," then reopens a fresh client and resumes via
36+
`subscribe(from_offset=...)`. This is the central Workflow Streams
37+
use case: a consumer can disappear (page refresh, server restart,
38+
laptop closed) and resume later without missing events or seeing
39+
duplicates.
40+
41+
**Scenario 3 — external (non-Activity) publisher:**
42+
43+
* `workflows/hub_workflow.py` — a passive workflow that does no work
44+
of its own; it exists only to host a `WorkflowStream` and shut down
45+
when signaled.
46+
* `run_external_publisher.py` — starts the hub, then publishes events
47+
into it from a plain Python coroutine using
48+
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
49+
task runs alongside; when the publisher is done it emits a sentinel
50+
event and signals `HubWorkflow.close`. The shape that fits a
51+
backend service or scheduled job pushing events into a workflow it
52+
didn't itself start.
53+
54+
**Scenario 4 — bounded log via `truncate()`:**
55+
56+
* `workflows/ticker_workflow.py` — a long-running workflow that
57+
publishes events at a fixed cadence and calls
58+
`self.stream.truncate(...)` periodically to bound log growth,
59+
keeping only the most recent N entries.
60+
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
61+
subscriber side by side. The fast one keeps up and sees every
62+
offset in order; the slow one falls behind a truncation and
63+
silently jumps forward to the new base offset. The output makes
64+
the trade visible: bounded log size in exchange for intermediate
65+
events being invisible to slow consumers.
66+
67+
**Scenario 5 — LLM streaming:**
68+
69+
* `workflows/llm_workflow.py` — hosts a `WorkflowStream` and runs
70+
`stream_completion` as a single activity. The workflow itself
71+
does no streaming; the activity owns the non-deterministic OpenAI
72+
call.
73+
* `activities/llm_activity.py` — calls
74+
`openai.AsyncOpenAI().chat.completions.create(stream=True)`,
75+
publishes each token chunk on the `delta` topic, the final
76+
accumulated text on `complete`, and a `RetryEvent` on `retry`
77+
when running on attempt > 1.
78+
* `run_llm.py` — subscribes to all three topics, renders deltas to
79+
the terminal as they arrive, and on a `retry` event uses ANSI
80+
escapes to rewind the printed output before the retried attempt
81+
re-publishes.
82+
83+
Scenario 5 runs on its own worker (`run_llm_worker.py`, on
84+
`workflow-stream-llm-task-queue`) because it needs the `openai`
85+
dependency and an `OPENAI_API_KEY`, and because killing this worker
86+
mid-stream is the easiest way to demonstrate retry handling without
87+
disrupting the other four scenarios.
88+
89+
## Run it
90+
91+
For scenarios 1–4, start the shared worker:
92+
93+
```bash
94+
uv run workflow_streams/run_worker.py
95+
```
96+
97+
For scenario 5, install the extra, export the key, and start the
98+
LLM worker:
99+
100+
```bash
101+
uv sync --group llm-stream
102+
export OPENAI_API_KEY=...
103+
uv run workflow_streams/run_llm_worker.py
104+
```
105+
106+
Then in another terminal, pick a scenario:
107+
108+
```bash
109+
uv run workflow_streams/run_publisher.py # scenario 1
110+
uv run workflow_streams/run_reconnecting_subscriber.py # scenario 2
111+
uv run workflow_streams/run_external_publisher.py # scenario 3
112+
uv run workflow_streams/run_truncating_ticker.py # scenario 4
113+
uv run workflow_streams/run_llm.py # scenario 5
114+
```
115+
116+
To exercise scenario 5's retry path, kill `run_llm_worker.py`
117+
(`Ctrl-C`) while output is streaming and start it again. The
118+
activity's next attempt sends a `RetryEvent` first; the consumer
119+
clears its on-screen output via ANSI escapes and re-renders from
120+
scratch.

workflow_streams/__init__.py

Whitespace-only changes.

workflow_streams/activities/__init__.py

Whitespace-only changes.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from __future__ import annotations
2+
3+
from datetime import timedelta
4+
5+
from openai import AsyncOpenAI
6+
from temporalio import activity
7+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
8+
9+
from workflow_streams.llm_shared import (
10+
TOPIC_COMPLETE,
11+
TOPIC_DELTA,
12+
TOPIC_RETRY,
13+
LLMInput,
14+
RetryEvent,
15+
TextComplete,
16+
TextDelta,
17+
)
18+
19+
20+
@activity.defn
21+
async def stream_completion(input: LLMInput) -> str:
22+
"""Stream an LLM completion to the parent workflow's stream.
23+
24+
Activity-as-publisher: each delta from the OpenAI streaming API is
25+
pushed to the workflow's stream as a ``TextDelta`` event on the
26+
``delta`` topic. The accumulated full text returns as the
27+
activity's result and is also published on the ``complete`` topic
28+
as a terminator. On retry attempts (``activity.info().attempt > 1``)
29+
a ``RetryEvent`` lands on the ``retry`` topic before the new
30+
attempt's deltas, so consumers can reset their accumulated state
31+
instead of concatenating the failed attempt's partial output with
32+
the retried attempt's full output.
33+
34+
No ``force_flush=True``: the 200ms ``batch_interval`` is fast
35+
enough for an interactive feel, and the WorkflowStreamClient's
36+
``__aexit__`` cancels a sleeping flusher cleanly.
37+
"""
38+
stream_client = WorkflowStreamClient.from_within_activity(
39+
batch_interval=timedelta(milliseconds=200),
40+
)
41+
# Disable provider-side retries; let Temporal own retry policy at
42+
# the activity layer.
43+
openai_client = AsyncOpenAI(max_retries=0)
44+
45+
async with stream_client:
46+
deltas = stream_client.topic(TOPIC_DELTA, type=TextDelta)
47+
complete = stream_client.topic(TOPIC_COMPLETE, type=TextComplete)
48+
retry = stream_client.topic(TOPIC_RETRY, type=RetryEvent)
49+
50+
attempt = activity.info().attempt
51+
if attempt > 1:
52+
retry.publish(RetryEvent(attempt=attempt))
53+
54+
full: list[str] = []
55+
oai_stream = await openai_client.chat.completions.create(
56+
model=input.model,
57+
messages=[{"role": "user", "content": input.prompt}],
58+
stream=True,
59+
)
60+
async for chunk in oai_stream:
61+
if not chunk.choices:
62+
continue
63+
text = chunk.choices[0].delta.content
64+
if not text:
65+
continue
66+
deltas.publish(TextDelta(text=text))
67+
full.append(text)
68+
69+
full_text = "".join(full)
70+
complete.publish(TextComplete(full_text=full_text))
71+
return full_text
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from datetime import timedelta
5+
6+
from temporalio import activity
7+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
8+
9+
from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent
10+
11+
12+
@activity.defn
13+
async def charge_card(order_id: str) -> str:
14+
"""Pretend to charge a card, publishing progress to the parent workflow.
15+
16+
`WorkflowStreamClient.from_within_activity()` reads the parent
17+
workflow id and the Temporal client from the activity context, so
18+
this activity can push events back without any wiring.
19+
"""
20+
client = WorkflowStreamClient.from_within_activity(
21+
batch_interval=timedelta(milliseconds=200)
22+
)
23+
async with client:
24+
progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent)
25+
progress.publish(ProgressEvent(message="charging card..."))
26+
await asyncio.sleep(1.0)
27+
progress.publish(
28+
ProgressEvent(message="card charged"),
29+
)
30+
return f"charge-{order_id}"

workflow_streams/llm_shared.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Types and constants for the LLM-streaming scenario.
2+
3+
Kept separate from ``shared.py`` because the other scenarios don't
4+
use these — and this scenario runs on its own worker and task queue
5+
so the ``openai`` dependency stays out of everyone else's path.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from dataclasses import dataclass
11+
12+
from temporalio.contrib.workflow_streams import WorkflowStreamState
13+
14+
# Scenario 5 runs on its own worker so the openai dependency only
15+
# matters for that scenario.
16+
LLM_TASK_QUEUE = "workflow-stream-llm-task-queue"
17+
18+
# Topics published by the activity.
19+
TOPIC_DELTA = "delta"
20+
TOPIC_COMPLETE = "complete"
21+
TOPIC_RETRY = "retry"
22+
23+
24+
@dataclass
25+
class LLMInput:
26+
prompt: str
27+
model: str = "gpt-5-mini"
28+
# Carries stream state across continue-as-new. None on a fresh start.
29+
stream_state: WorkflowStreamState | None = None
30+
31+
32+
@dataclass
33+
class TextDelta:
34+
text: str
35+
36+
37+
@dataclass
38+
class TextComplete:
39+
full_text: str
40+
41+
42+
@dataclass
43+
class RetryEvent:
44+
attempt: int

0 commit comments

Comments
 (0)