Skip to content

Commit 553a83b

Browse files
committed
workflow_streams: rename stop_polling to detach_pollers
Better captures the operation. The method releases the in-flight __temporal_workflow_stream_poll update handlers (subscribers were "attached" to the stream's drain signal; this detaches them so they return to the caller, who can then follow continue-as-new or stop) and rejects new poll attachments. "stop_polling" ambiguously suggested the stream itself was the one polling; "detach_pollers" names the actor (the pollers / subscribers) and captures the relationship. Updates the continue_as_new helper and the explicit-recipe docstring/test accordingly.
1 parent 1a19ef5 commit 553a83b

2 files changed

Lines changed: 18 additions & 18 deletions

File tree

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,17 @@ def get_state(
264264
publishers=active_publishers,
265265
)
266266

267-
def stop_polling(self) -> None:
268-
"""Release waiting subscribers and reject new poll updates.
267+
def detach_pollers(self) -> None:
268+
"""Release waiting pollers and reject new poll updates.
269269
270270
After this call the stream's ``__temporal_workflow_stream_poll``
271-
update handler stops serving subscribers on this run: any
272-
in-flight polls return their current item batch (often empty)
273-
and new polls are rejected at the validator. Publishes still
274-
land in the in-memory log and ``get_state`` /
275-
``continue_as_new`` remain valid — the stream is being held
276-
open just long enough to snapshot state and hand off to the
277-
next run.
271+
update handler releases its in-flight subscribers on this run:
272+
each waiting poll returns its current item batch (often empty)
273+
so the consumer can either follow continue-as-new or stop, and
274+
new polls are rejected at the validator. Publishes still land
275+
in the in-memory log and ``get_state`` / ``continue_as_new``
276+
remain valid — the stream is being held open just long enough
277+
to snapshot state and hand off to the next run.
278278
279279
Call this before
280280
``await workflow.wait_condition(workflow.all_handlers_finished)``
@@ -288,15 +288,15 @@ async def continue_as_new(
288288
*,
289289
publisher_ttl: timedelta = timedelta(seconds=900),
290290
) -> NoReturn:
291-
"""Stop polling, wait for handlers, continue-as-new with built args.
291+
"""Detach pollers, wait for handlers, continue-as-new with built args.
292292
293-
Replaces the three-line recipe ``stop_polling()`` →
293+
Replaces the three-line recipe ``detach_pollers()`` →
294294
``wait_condition(all_handlers_finished)`` →
295295
``workflow.continue_as_new(args=...)`` for the common case where
296296
the only CAN parameter that varies is ``args``.
297297
298-
``build_args`` is invoked *after* poll-serving has stopped, with
299-
the post-stop :class:`WorkflowStreamState` as its single
298+
``build_args`` is invoked *after* pollers have been detached,
299+
with the post-detach :class:`WorkflowStreamState` as its single
300300
argument. The caller threads that state into whatever input
301301
dataclass the workflow expects:
302302
@@ -309,19 +309,19 @@ async def continue_as_new(
309309
310310
Workflows that need to override other CAN parameters
311311
(``task_queue``, ``retry_policy``, ``run_timeout``, etc.) should
312-
keep using the explicit ``stop_polling`` / ``wait_condition`` /
312+
keep using the explicit ``detach_pollers`` / ``wait_condition`` /
313313
``workflow.continue_as_new(...)`` recipe.
314314
315315
Args:
316-
build_args: Callable that receives the post-stop stream
316+
build_args: Callable that receives the post-detach stream
317317
state and returns the positional ``args`` for the new
318318
run.
319319
publisher_ttl: Forwarded to :meth:`get_state`.
320320
321321
Does not return; ``workflow.continue_as_new`` raises an internal
322322
exception that the SDK uses to close the run.
323323
"""
324-
self.stop_polling()
324+
self.detach_pollers()
325325
await workflow.wait_condition(workflow.all_handlers_finished)
326326
workflow.continue_as_new(
327327
args=build_args(self.get_state(publisher_ttl=publisher_ttl)),

tests/contrib/workflow_streams/test_workflow_streams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,7 +1685,7 @@ async def run(self, _input: CANWorkflowInputTyped) -> None:
16851685
return
16861686
if self._should_continue:
16871687
self._should_continue = False
1688-
self.stream.stop_polling()
1688+
self.stream.detach_pollers()
16891689
await workflow.wait_condition(workflow.all_handlers_finished)
16901690
workflow.continue_as_new(
16911691
args=[
@@ -1829,7 +1829,7 @@ async def run(self, _input: CANWorkflowInputTyped) -> None:
18291829
@pytest.mark.asyncio
18301830
async def test_continue_as_new_helper(client: Client) -> None:
18311831
"""The ``WorkflowStream.continue_as_new`` helper preserves log and dedup state
1832-
just like the explicit stop_polling/wait/CAN recipe."""
1832+
just like the explicit detach_pollers/wait/CAN recipe."""
18331833
async with new_worker(
18341834
client,
18351835
ContinueAsNewHelperWorkflow,

0 commit comments

Comments
 (0)