You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/develop/python/workflows/workflow-streams.mdx
+9-27Lines changed: 9 additions & 27 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -24,6 +24,8 @@ It batch-publishes events to amortize per-Signal cost, deduplicates batches for
24
24
25
25
Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job. It is not suited to ultra-low-latency cases like real-time voice, and it targets modest fan-out: tens of publishers and subscribers per Workflow, not thousands.
26
26
27
+
The Workflow hosts the event log. Publishers append events — the Workflow itself, Activities, or external processes via `WorkflowStreamClient`. Subscribers attach to the Workflow ID, optionally filter by **topic** (a string label set when publishing; topics are implicit and created on first publish), and consume events by long-polling from an offset they store.
28
+
27
29
:::tip SUPPORT, STABILITY, and DEPENDENCY INFO
28
30
29
31
The `temporalio.contrib.workflow_streams` module is currently in
@@ -44,31 +46,7 @@ The API may change before general availability.
44
46
- History-size cost and tuning: [Architecture](#architecture).
45
47
- Long-running streams that need Continue-As-New: [Continue-As-New](#continue-as-new).
46
48
47
-
## Concepts
48
-
49
-
A `WorkflowStream` is a many-to-many event channel hosted inside a Workflow:
50
-
51
-
-**The streaming Workflow** hosts the event log and ensures durability. Constructing a `WorkflowStream` instance from `@workflow.init` registers the handlers that accept publishes and serve subscribers.
52
-
-**Publishers** append events. The Workflow itself publishes via its in-memory `WorkflowStream`. Activities and external clients publish via `WorkflowStreamClient`, which batches and ships events to the Workflow.
53
-
-**Subscribers** consume events. They call `WorkflowStreamClient.subscribe(...)` and iterate the returned async iterator.
54
-
55
-
Events are organized by **topic**, a string label set when publishing. Subscribers can filter to one or more topics or consume all of them. Topics are implicit; publishing to a topic creates it.
56
-
57
-
The event log lives inside the Workflow, so the Workflow is the single source of truth. Any process that bridges events to the outside world (an SSE proxy serving a browser, a forwarding Activity) can stay stateless and resume by replaying from a stored offset.
58
-
59
-
## How it works at a glance
60
-
61
-
Temporal's standard message-passing primitives carry the work. No new transport is involved:
62
-
63
-
-**Publishers send Signals.** A `WorkflowStreamClient` batches publishes (default every 2 seconds) and ships each batch as a single Signal to the Workflow.
64
-
-**Subscribers send Updates.**`subscribe()` issues a long-poll Update that returns when new entries are available past the requested offset.
65
-
-**Position is queryable.** A Query exposes the current head offset, useful as a snapshot of how much has been written.
66
-
67
-
The Workflow keeps an in-memory append-only log of `(topic, data)` entries, each with a monotonically increasing offset. Subscribers maintain their own cursor and receive the next range past that cursor on every poll. See [Architecture](#architecture) for batching, dedup, history-size implications, and tuning.
68
-
69
-
When events originate in an Activity, for example an Activity streaming output from an LLM, have the Activity publish to the stream directly. The Workflow acts as a conduit: it hosts the stream, processes only the Activity's return value, and emits its own lifecycle events, but does not read the stream itself. This keeps the Workflow's own state independent of partial output from Activity attempts that ended up retrying, since the Workflow only sees the successful attempt's return. See [Delivery semantics](#delivery-semantics) for the precise guarantees.
70
-
71
-
## Where the stream lives
49
+
## Choose where to host the stream
72
50
73
51
A `WorkflowStream` is hosted inside a Workflow, so the first design choice is whether one Workflow handles both the work and the stream, or whether a separate Workflow exists only to host the stream. The choice is mostly about lifecycle.
74
52
@@ -149,6 +127,8 @@ The `type=` argument is optional and defaults to `Any`. Pass it when you want th
149
127
150
128
Any process that has a Temporal `Client` and the target Workflow ID can publish to that Workflow's stream by constructing a `WorkflowStreamClient`. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities. Construct one with `WorkflowStreamClient.create(client, workflow_id)`, then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let the async context manager flush on exit.
151
129
130
+
When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but does not read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see [Delivery semantics](#delivery-semantics).
131
+
152
132
```python
153
133
from datetime import timedelta
154
134
@@ -223,7 +203,9 @@ If your application needs to bound this (to cap memory, to keep the stream close
223
203
224
204
Subscribing uses the same client construction as publishing: `WorkflowStreamClient.create(client, workflow_id)` from any process that has a Temporal `Client`, or `from_within_activity()` inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.
225
205
226
-
Subscribing from inside the host Workflow is intentionally unsupported. The Workflow only sees the successful return value of each Activity; the stream may carry partial output from attempts that failed and were retried. Letting the Workflow read its own stream would mix those two views and break the conduit role the Workflow is meant to play. See [How it works at a glance](#how-it-works-at-a-glance) for the framing.
206
+
Subscribing from inside the host Workflow is intentionally unsupported. The Workflow only sees the successful return value of each Activity; the stream may carry partial output from attempts that failed and were retried. Letting the Workflow read its own stream would mix those two views and break the conduit role the Workflow is meant to play.
207
+
208
+
The Workflow is the single source of truth for stream state, so any process bridging events to the outside world (an SSE proxy serving a browser, a forwarding Activity) can stay stateless — store the last delivered `item.offset`, and reconnects resume from that offset without coordinating with anyone but the Workflow.
227
209
228
210
Once you have a client, iterate a topic handle's `subscribe()`, the counterpart to `publish()`. The handle's bound type drives decoding, so each `item.data` arrives as `T` via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.
- The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value (see [How it works at a glance](#how-it-works-at-a-glance) for why the Workflow acts as a conduit rather than reading its own stream).
557
+
- The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value, never reading its own stream — see [Publish from a client](#publish-from-a-client) for why.
576
558
- The Activity publishes a `RETRY` event when `activity.info().attempt > 1`. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see [Delivery semantics](#delivery-semantics)).
577
559
- Termination uses an *ack handshake*: the consumer signals the Workflow once it has received the `close` event, so the Workflow can return as soon as the subscriber confirms. The `wait_condition` timeout is the fallback when no subscriber is attached (see [Closing the stream](#closing-the-stream) for the simpler fixed-sleep alternative).
578
560
-`force_flush=True` is used only on the first delta and on the `RETRY` sentinel, where latency matters. Subsequent deltas batch at the 200 ms `batch_interval`; per-delta `force_flush=True` would generate one Signal per token (see [Tuning](#tuning) for the trade-off).
0 commit comments