Skip to content

Commit a760ad3

Browse files
committed
samples: workflow_streams: reorganize README; drop closing section
Two fixes: 1. Reorganize so the README doesn't jump back and forth between scenarios. The previous shape introduced 1-4, then put scenario 5's full description plus its setup and run instructions inline, then jumped back to a "Run it" section that only covered 1-4. New shape: all five scenarios up front (parallel structure), one unified "Run it" section that covers worker setup for both groups and all five runner scripts in one block, then expected output, then notes. 2. Drop the inline "Ending the stream" section. The same material is in documentation/docs/develop/python/libraries/workflow-streams.mdx under the "Closing the stream" anchor, so the README links there from the Notes block instead of duplicating the explanation. The scenario 5 "split-out worker" rationale (extra dependency, secret, retry-via-Ctrl-C) collapses to a single sentence at the end of its bullet block.
1 parent 44d944b commit a760ad3

1 file changed

Lines changed: 78 additions & 123 deletions

File tree

workflow_streams/README.md

Lines changed: 78 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
offset-addressed event channel. The workflow holds an append-only log;
1010
external clients (activities, starters, BFFs) publish to topics via
1111
signals and subscribe via long-poll updates. This packages the
12-
boilerplate — batching, offset tracking, topic filtering, continue-as-new
13-
hand-off — into a reusable stream.
12+
boilerplate — batching, offset tracking, topic filtering,
13+
continue-as-new hand-off — into a reusable stream.
1414

15-
This directory has four scenarios sharing one Worker, plus a fifth
16-
LLM-streaming scenario on its own Worker (see
17-
[Scenario 5 — LLM streaming](#scenario-5--llm-streaming) below for why
18-
it's separate).
15+
This directory has five scenarios. The first four share one worker;
16+
the fifth has its own worker because it needs the `openai` package
17+
and an `OPENAI_API_KEY`.
1918

2019
**Scenario 1 — basic publish/subscribe with heterogeneous topics:**
2120

@@ -33,11 +32,11 @@ it's separate).
3332
publishes stage transitions over ~10 seconds, leaving room for a
3433
consumer to disconnect and reconnect mid-run.
3534
* `run_reconnecting_subscriber.py` — connects, reads a couple of
36-
events, persists `item.offset + 1` to disk, "disconnects," then
37-
reopens a fresh client and resumes via `subscribe(from_offset=...)`.
38-
This is the central Workflow Streams use case: a consumer can
39-
disappear (page refresh, server restart, laptop closed) and resume
40-
later without missing events or seeing duplicates.
35+
events, "disconnects," then reopens a fresh client and resumes via
36+
`subscribe(from_offset=...)`. This is the central Workflow Streams
37+
use case: a consumer can disappear (page refresh, server restart,
38+
laptop closed) and resume later without missing events or seeing
39+
duplicates.
4140

4241
**Scenario 3 — external (non-Activity) publisher:**
4342

@@ -47,131 +46,82 @@ it's separate).
4746
* `run_external_publisher.py` — starts the hub, then publishes events
4847
into it from a plain Python coroutine using
4948
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
50-
task runs alongside; when the publisher is done it emits an in-band
51-
sentinel headline (`__done__`) into the stream, then signals
52-
`HubWorkflow.close`. The subscriber breaks on the sentinel and
53-
exits its `async for`. This is the shape that fits a backend
54-
service or scheduled job pushing events into a workflow it didn't
55-
itself start.
49+
task runs alongside; when the publisher is done it emits a sentinel
50+
event and signals `HubWorkflow.close`. The shape that fits a
51+
backend service or scheduled job pushing events into a workflow it
52+
didn't itself start.
5653

5754
**Scenario 4 — bounded log via `truncate()`:**
5855

5956
* `workflows/ticker_workflow.py` — a long-running workflow that
6057
publishes events at a fixed cadence and calls
61-
`self.stream.truncate(...)` periodically to bound log growth, keeping
62-
only the most recent N entries.
58+
`self.stream.truncate(...)` periodically to bound log growth,
59+
keeping only the most recent N entries.
6360
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
64-
subscriber side by side. The fast one keeps up and sees every offset
65-
in order; the slow one sleeps between iterations, falls behind a
66-
truncation, and silently jumps forward to the new base offset. The
67-
output makes the trade visible: bounded log size in exchange for
68-
intermediate events being invisible to slow consumers.
69-
70-
`run_worker.py` registers all four workflows and the activity.
71-
72-
## Scenario 5 — LLM streaming
73-
74-
* `workflows/llm_workflow.py` — a workflow that hosts a
75-
`WorkflowStream` and runs `stream_completion` as a single activity.
76-
The workflow itself does no streaming; the activity owns the
77-
non-deterministic OpenAI call.
61+
subscriber side by side. The fast one keeps up and sees every
62+
offset in order; the slow one falls behind a truncation and
63+
silently jumps forward to the new base offset. The output makes
64+
the trade visible: bounded log size in exchange for intermediate
65+
events being invisible to slow consumers.
66+
67+
**Scenario 5 — LLM streaming:**
68+
69+
* `workflows/llm_workflow.py` — hosts a `WorkflowStream` and runs
70+
`stream_completion` as a single activity. The workflow itself
71+
does no streaming; the activity owns the non-deterministic OpenAI
72+
call.
7873
* `activities/llm_activity.py` — calls
7974
`openai.AsyncOpenAI().chat.completions.create(stream=True)`,
80-
publishes each token chunk as a `TextDelta` on the `delta` topic,
81-
the final accumulated text on the `complete` topic, and a
82-
`RetryEvent` on the `retry` topic when running on attempt > 1.
75+
publishes each token chunk on the `delta` topic, the final
76+
accumulated text on `complete`, and a `RetryEvent` on `retry`
77+
when running on attempt > 1.
8378
* `run_llm.py` — subscribes to all three topics, renders deltas to
8479
the terminal as they arrive, and on a `retry` event uses ANSI
8580
escapes to rewind the printed output before the retried attempt
86-
starts re-publishing.
87-
* `run_llm_worker.py` — separate worker on its own task queue
88-
(`workflow-stream-llm-task-queue`), registering only `LLMWorkflow`
89-
and `stream_completion`. This isolates the `openai` dependency and
90-
the `OPENAI_API_KEY` requirement to this one scenario.
81+
re-publishes.
9182

92-
This scenario is split out for two reasons. First, it needs an extra
93-
dependency (`openai`) and a secret (`OPENAI_API_KEY`) — putting it on
94-
the main worker would force every other scenario to set up an OpenAI
95-
key. Second, killing the LLM worker mid-stream is the easiest way to
96-
demonstrate retry handling, and you don't want the same `Ctrl-C` to
97-
interrupt the other four scenarios' worker.
83+
Scenario 5 runs on its own worker (`run_llm_worker.py`, on
84+
`workflow-stream-llm-task-queue`) because it needs the `openai`
85+
dependency and an `OPENAI_API_KEY`, and because killing this worker
86+
mid-stream is the easiest way to demonstrate retry handling without
87+
disrupting the other four scenarios.
88+
89+
## Run it
9890

99-
Setup:
91+
For scenarios 1–4, start the shared worker:
10092

10193
```bash
102-
uv sync --group llm-stream
103-
export OPENAI_API_KEY=...
94+
uv run workflow_streams/run_worker.py
10495
```
10596

106-
Run:
97+
For scenario 5, install the extra, export the key, and start the
98+
LLM worker:
10799

108100
```bash
109-
# Terminal 1: LLM worker (its own task queue)
101+
uv sync --group llm-stream
102+
export OPENAI_API_KEY=...
110103
uv run workflow_streams/run_llm_worker.py
104+
```
105+
106+
Then in another terminal, pick a scenario:
111107

112-
# Terminal 2:
113-
uv run workflow_streams/run_llm.py
108+
```bash
109+
uv run workflow_streams/run_publisher.py # scenario 1
110+
uv run workflow_streams/run_reconnecting_subscriber.py # scenario 2
111+
uv run workflow_streams/run_external_publisher.py # scenario 3
112+
uv run workflow_streams/run_truncating_ticker.py # scenario 4
113+
uv run workflow_streams/run_llm.py # scenario 5
114114
```
115115

116-
To trigger the retry path, kill the LLM worker in Terminal 1
117-
(`Ctrl-C`) while output is streaming, then start it again. The
116+
To exercise scenario 5's retry path, kill `run_llm_worker.py`
117+
(`Ctrl-C`) while output is streaming and start it again. The
118118
activity's next attempt sends a `RetryEvent` first; the consumer
119119
clears its on-screen output via ANSI escapes and re-renders from
120120
scratch.
121121

122-
## Ending the stream
123-
124-
`WorkflowStreamClient.subscribe()` is a long-poll loop — it does not
125-
exit on its own when the host workflow completes. Two things have to
126-
happen at the end of a streamed workflow for clean shutdown:
127-
128-
1. **An in-band terminator that subscribers recognize.** Each scenario
129-
here sends one before the workflow exits:
130-
- `OrderWorkflow` and `PipelineWorkflow` publish a "complete"
131-
status / stage event; consumers break on it.
132-
- `run_external_publisher.py` publishes a sentinel
133-
`NewsEvent(headline="__done__")` immediately before signaling
134-
`HubWorkflow.close`; the consumer breaks on the sentinel.
135-
- `TickerWorkflow`'s final tick (`n == count - 1`) is the
136-
terminator; subscribers break when they see it. `keep_last`
137-
guarantees that final offset survives the last truncation, so
138-
even slow consumers reach it.
139-
140-
2. **A short hold-open in the workflow before returning** so that the
141-
final publish gets fetched. Items published in the same workflow
142-
task that returns from `@workflow.run` are abandoned: the
143-
in-memory log dies with the workflow, and the next subscriber
144-
poll lands on a completed workflow. Each workflow here ends with
145-
146-
```python
147-
await workflow.sleep(timedelta(milliseconds=500))
148-
return ...
149-
```
150-
151-
which gives subscribers in their `poll_cooldown` interval time to
152-
issue one more poll. With both pieces in place, subscribers
153-
receive the terminator, break out of their `async for`, and stop
154-
polling — by the time the workflow exits there are no in-flight
155-
poll handlers, so the SDK does not warn about unfinished
156-
handlers.
157-
158-
## Run it
159-
160-
```bash
161-
# Terminal 1: worker
162-
uv run workflow_streams/run_worker.py
163-
164-
# Terminal 2: pick a scenario
165-
uv run workflow_streams/run_publisher.py
166-
# or
167-
uv run workflow_streams/run_reconnecting_subscriber.py
168-
# or
169-
uv run workflow_streams/run_external_publisher.py
170-
# or
171-
uv run workflow_streams/run_truncating_ticker.py
172-
```
122+
## Expected output
173123

174-
Expected output on the basic publisher side:
124+
Scenario 1 (basic publisher):
175125

176126
```
177127
[status] received: order=order-1
@@ -183,9 +133,9 @@ Expected output on the basic publisher side:
183133
workflow result: charge-order-1
184134
```
185135

186-
Expected output on the reconnecting subscriber side. Each line carries
187-
a stats column on the left (`proc`, `avail`, `pend`) and a phase /
188-
event message on the right; a background poller emits a `·` heartbeat
136+
Scenario 2 (reconnecting subscriber). Each line carries a stats
137+
column on the left (`proc`, `avail`, `pend`) and a phase / event
138+
message on the right; a background poller emits a `·` heartbeat
189139
once a second. Offsets are continuous across the disconnect — no
190140
events lost, none duplicated:
191141

@@ -208,17 +158,22 @@ proc= 6 avail= 6 pend= 0 │ workflow result: pipeline ... done
208158

209159
## Notes
210160

211-
* **Subscriber start position.** `subscribe(...)` without `from_offset`
212-
starts at the stream's current base offset and follows live — older
213-
events that have been truncated, or that arrived before the
214-
subscribe call, are not replayed. Pass `from_offset=N` to resume
215-
from a known position (see `run_reconnecting_subscriber.py`); the
216-
iterator skips forward to the current base if `N` has been
217-
truncated.
161+
* **Subscriber start position.** `subscribe(...)` without
162+
`from_offset` starts at the stream's current base offset and
163+
follows live — older events that have been truncated, or that
164+
arrived before the subscribe call, are not replayed. Pass
165+
`from_offset=N` to resume from a known position (see
166+
`run_reconnecting_subscriber.py`); the iterator skips forward to
167+
the current base if `N` has been truncated.
218168
* **Continue-as-new.** Every `*Input` dataclass carries
219169
`stream_state: WorkflowStreamState | None = None`. To survive
220-
continue-as-new without losing buffered items, capture the workflow's
221-
stream state and pass it to the next run via
222-
`WorkflowStream(prior_state=...)` in `@workflow.init`. The samples
223-
declare the field for completeness; none of them actually trigger
224-
continue-as-new.
170+
continue-as-new without losing buffered items, capture the
171+
workflow's stream state and pass it to the next run via
172+
`WorkflowStream(prior_state=...)` in `@workflow.init`. The
173+
samples declare the field for completeness; none of them
174+
actually trigger continue-as-new.
175+
* **Closing the stream.** Each scenario uses an in-band terminator
176+
plus a short `workflow.sleep` hold-open so subscribers receive
177+
the final event before the workflow exits. See
178+
[Closing the stream](https://docs.temporal.io/develop/python/libraries/workflow-streams#closing-the-stream)
179+
in the docs for the full pattern.

0 commit comments

Comments
 (0)