Skip to content

Commit 19d2e18

Browse files
committed
workflow_streams: rename WorkflowStream.drain to stop_polling
Per PR review feedback, rename the workflow-side state-transition method that releases waiting subscribers and rejects new poll updates. The previous name implied "wait for buffered items to flush"; the operation actually evicts pollers and refuses new ones, while keeping publishes and get_state/continue_as_new valid for the rest of the run. The new name describes that precisely. Updates the continue_as_new helper, the explicit-recipe docstring, and the one test that drives the explicit recipe. Internal _draining state flag stays as-is (private implementation detail).
1 parent 59c7582 commit 19d2e18

2 files changed

Lines changed: 22 additions & 13 deletions

File tree

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,17 @@ def get_state(
253253
publishers=active_publishers,
254254
)
255255

256-
def drain(self) -> None:
257-
"""Unblock all waiting poll handlers and reject new polls.
256+
def stop_polling(self) -> None:
257+
"""Release waiting subscribers and reject new poll updates.
258+
259+
After this call the stream's ``__temporal_workflow_stream_poll``
260+
update handler stops serving subscribers on this run: any
261+
in-flight polls return their current item batch (often empty)
262+
and new polls are rejected at the validator. Publishes still
263+
land in the in-memory log and ``get_state`` /
264+
``continue_as_new`` remain valid — the stream is being held
265+
open just long enough to snapshot state and hand off to the
266+
next run.
258267
259268
Call this before
260269
``await workflow.wait_condition(workflow.all_handlers_finished)``
@@ -268,17 +277,17 @@ async def continue_as_new(
268277
*,
269278
publisher_ttl: timedelta = timedelta(seconds=900),
270279
) -> NoReturn:
271-
"""Drain, wait for handlers, then continue-as-new with built args.
280+
"""Stop polling, wait for handlers, continue-as-new with built args.
272281
273-
Replaces the three-line recipe ``drain()`` →
282+
Replaces the three-line recipe ``stop_polling()`` →
274283
``wait_condition(all_handlers_finished)`` →
275284
``workflow.continue_as_new(args=...)`` for the common case where
276285
the only CAN parameter that varies is ``args``.
277286
278-
``build_args`` is invoked *after* drain has stabilized, with the
279-
post-drain :class:`WorkflowStreamState` as its single argument.
280-
The caller threads that state into whatever input dataclass the
281-
workflow expects:
287+
``build_args`` is invoked *after* poll-serving has stopped, with
288+
the post-stop :class:`WorkflowStreamState` as its single
289+
argument. The caller threads that state into whatever input
290+
dataclass the workflow expects:
282291
283292
.. code-block:: python
284293
@@ -289,19 +298,19 @@ async def continue_as_new(
289298
290299
Workflows that need to override other CAN parameters
291300
(``task_queue``, ``retry_policy``, ``run_timeout``, etc.) should
292-
keep using the explicit ``drain`` / ``wait_condition`` /
301+
keep using the explicit ``stop_polling`` / ``wait_condition`` /
293302
``workflow.continue_as_new(...)`` recipe.
294303
295304
Args:
296-
build_args: Callable that receives the post-drain stream
305+
build_args: Callable that receives the post-stop stream
297306
state and returns the positional ``args`` for the new
298307
run.
299308
publisher_ttl: Forwarded to :meth:`get_state`.
300309
301310
Does not return; ``workflow.continue_as_new`` raises an internal
302311
exception that the SDK uses to close the run.
303312
"""
304-
self.drain()
313+
self.stop_polling()
305314
await workflow.wait_condition(workflow.all_handlers_finished)
306315
workflow.continue_as_new(
307316
args=build_args(self.get_state(publisher_ttl=publisher_ttl)),

tests/contrib/workflow_streams/test_workflow_streams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,7 +1681,7 @@ async def run(self, _input: CANWorkflowInputTyped) -> None:
16811681
return
16821682
if self._should_continue:
16831683
self._should_continue = False
1684-
self.stream.drain()
1684+
self.stream.stop_polling()
16851685
await workflow.wait_condition(workflow.all_handlers_finished)
16861686
workflow.continue_as_new(
16871687
args=[
@@ -1825,7 +1825,7 @@ async def run(self, _input: CANWorkflowInputTyped) -> None:
18251825
@pytest.mark.asyncio
18261826
async def test_continue_as_new_helper(client: Client) -> None:
18271827
"""The ``WorkflowStream.continue_as_new`` helper preserves log and dedup state
1828-
just like the explicit drain/wait/CAN recipe."""
1828+
just like the explicit stop_polling/wait/CAN recipe."""
18291829
async with new_worker(
18301830
client,
18311831
ContinueAsNewHelperWorkflow,

0 commit comments

Comments
 (0)