Skip to content

Commit 31247ca

Browse files
committed
don't return full text twice
1 parent 5ce1fcb commit 31247ca

1 file changed

Lines changed: 6 additions & 10 deletions

File tree

docs/develop/python/workflows/workflow-streams.mdx

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,6 @@ from openai import AsyncOpenAI
465465
class TextDelta:
466466
text: str
467467

468-
@dataclass
469-
class TextComplete:
470-
full_text: str
471-
472468

473469
@activity.defn
474470
async def stream_completion(prompt: str) -> str:
@@ -480,8 +476,8 @@ async def stream_completion(prompt: str) -> str:
480476

481477
async with stream_client:
482478
deltas = stream_client.topic("delta", type=TextDelta)
483-
complete = stream_client.topic("complete", type=TextComplete)
484479
retry = stream_client.topic("retry", type=dict)
480+
close = stream_client.topic("close")
485481

486482
# Tell consumers an earlier attempt's deltas are stale.
487483
if activity.info().attempt > 1:
@@ -505,7 +501,7 @@ async def stream_completion(prompt: str) -> str:
505501
deltas.publish(TextDelta(text=text), force_flush=first)
506502
first = False
507503
full.append(text)
508-
complete.publish(TextComplete(full_text="".join(full)))
504+
close.publish({})
509505
return "".join(full)
510506
```
511507

@@ -529,7 +525,7 @@ class ChatWorkflow:
529525
input.prompt,
530526
start_to_close_timeout=timedelta(minutes=5),
531527
)
532-
# Wait for the subscriber to ack the terminal `complete` event.
528+
# Wait for the subscriber to ack the terminal `close` event.
533529
# The timeout is a fallback for when no subscriber is attached;
534530
# with the ack, the typical case exits as soon as the subscriber confirms.
535531
try:
@@ -554,7 +550,7 @@ async def stream_chat(chat_id: str) -> str:
554550
... # display the accumulated output (terminal redraw, UI update, etc.)
555551

556552
async for item in stream.subscribe(
557-
["delta", "retry", "complete"], result_type=RawValue
553+
["delta", "retry", "close"], result_type=RawValue
558554
):
559555
if item.topic == "retry":
560556
# Earlier attempt's deltas are stale; drop what we've shown.
@@ -564,7 +560,7 @@ async def stream_chat(chat_id: str) -> str:
564560
delta = converter.from_payload(item.data.payload, TextDelta)
565561
output.append(delta.text)
566562
render()
567-
elif item.topic == "complete":
563+
elif item.topic == "close":
568564
# Acknowledge so the Workflow can return without a sleep.
569565
await temporal_client.get_workflow_handle(chat_id).signal(
570566
ChatWorkflow.subscriber_acknowledged_terminator
@@ -578,7 +574,7 @@ A few choices in this shape are deliberate:
578574

579575
- The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value (see [How it works at a glance](#how-it-works-at-a-glance) for why the Workflow acts as a conduit rather than reading its own stream).
580576
- The Activity publishes a `RETRY` event when `activity.info().attempt > 1`. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see [Delivery semantics](#delivery-semantics)).
581-
- Termination uses an *ack handshake*: the consumer signals the Workflow once it has received the `complete` event, so the Workflow can return as soon as the subscriber confirms. The `wait_condition` timeout is the fallback when no subscriber is attached (see [Closing the stream](#closing-the-stream) for the simpler fixed-sleep alternative).
577+
- Termination uses an *ack handshake*: the consumer signals the Workflow once it has received the `close` event, so the Workflow can return as soon as the subscriber confirms. The `wait_condition` timeout is the fallback when no subscriber is attached (see [Closing the stream](#closing-the-stream) for the simpler fixed-sleep alternative).
582578
- `force_flush=True` is used only on the first delta and on the `RETRY` sentinel, where latency matters. Subsequent deltas batch at the 200 ms `batch_interval`; per-delta `force_flush=True` would generate one Signal per token (see [Tuning](#tuning) for the trade-off).
583579

584580
## See also

0 commit comments

Comments
 (0)