Skip to content

Commit 1a19ef5

Browse files
committed
workflow_streams: make topic(type=...) optional, default to Any
Per discussion: heterogeneous topics and dynamic-topic forwarders previously had to write type=cast(type[Any], cast(object, Any)) at the call site to satisfy pyright (typing.Any is a special form, not a class). Make the type kwarg optional and default to typing.Any so the natural form is just client.topic("name") / stream.topic("name"). The type-uniformity invariant is unchanged: each instance binds a topic name to exactly one type; mixing untyped (= Any) with a specific type still raises. typing.Any can also be passed explicitly, with the cast still required for type-strict callers. Adds overloads so callers that pass type=T still get a typed TopicHandle[T] from pyright; the no-type form returns TopicHandle[Any] / WorkflowTopicHandle[Any]. Updates the existing test to exercise both the omitted-type and explicit-type=Any paths.
1 parent 19d2e18 commit 1a19ef5

3 files changed

Lines changed: 71 additions & 42 deletions

File tree

temporalio/contrib/workflow_streams/_client.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import uuid
2424
from collections.abc import AsyncIterator
2525
from datetime import timedelta
26-
from typing import Any, TypeVar
26+
from typing import Any, TypeVar, overload
2727

2828
from typing_extensions import Self
2929

@@ -246,12 +246,19 @@ def _publish_to_topic(
246246
):
247247
self._flush_event.set()
248248

249-
def topic(self, name: str, *, type: type[T]) -> TopicHandle[T]:
249+
@overload
250+
def topic(self, name: str) -> TopicHandle[Any]: ...
251+
@overload
252+
def topic(self, name: str, *, type: type[T]) -> TopicHandle[T]: ...
253+
254+
def topic(
255+
self, name: str, *, type: type[T] | None = None
256+
) -> TopicHandle[T] | TopicHandle[Any]:
250257
"""Return a typed handle for publishing to and subscribing from ``name``.
251258
252259
The handle records the topic name and value type so call sites
253260
do not have to repeat them. Each :class:`WorkflowStreamClient`
254-
instance binds a topic name to exactly one ``T``: a second call
261+
instance binds a topic name to exactly one type: a second call
255262
with an unequal type raises ``RuntimeError``. Repeating the
256263
same call with the same type is idempotent and returns an
257264
equivalent handle.
@@ -261,46 +268,53 @@ def topic(self, name: str, *, type: type[T]) -> TopicHandle[T]:
261268
equality on the type object; subtype and union-superset
262269
relationships are not recognized.
263270
264-
For heterogeneous topics or dynamic-topic forwarders, pass
265-
``type=typing.Any``; subscribers receive the converter's
266-
default decoded value. Pre-built ``Payload`` values can be
267-
passed to :meth:`TopicHandle.publish` regardless of the bound
268-
type (zero-copy fast path) — there is no need to bind the
269-
topic to ``Payload`` itself, and doing so would break the
270-
subscribe path (use ``result_type=RawValue`` on
271-
:meth:`subscribe` if you need raw payloads on a subscriber).
271+
Omitting ``type`` (or passing ``type=typing.Any``) is the
272+
documented escape hatch for heterogeneous topics or
273+
dynamic-topic forwarders: the handle accepts any value, and
274+
subscribers receive the converter's default decoded value.
275+
Pre-built ``Payload`` values can be passed to
276+
:meth:`TopicHandle.publish` regardless of the bound type
277+
(zero-copy fast path) — there is no need to bind the topic to
278+
``Payload`` itself, and doing so would break the subscribe
279+
path (use ``result_type=RawValue`` on
280+
:meth:`WorkflowStreamClient.subscribe` if you need raw
281+
payloads on a subscriber).
272282
273283
Args:
274284
name: Topic name.
275285
type: Value type bound to this handle. Used as the
276286
``result_type`` when subscribing through the handle.
287+
Defaults to ``typing.Any`` (heterogeneous topic).
277288
278289
Returns:
279-
:class:`TopicHandle` bound to ``name`` and ``type``.
290+
:class:`TopicHandle` bound to ``name`` and the resolved
291+
type.
280292
281293
Raises:
282294
RuntimeError: If ``name`` is already bound on this client
283295
to a different type.
284296
"""
285-
if type is Payload:
297+
bound: Any = Any if type is None else type
298+
if bound is Payload:
286299
raise RuntimeError(
287300
"Cannot bind a topic to type=Payload: the payload converter "
288301
"has no Payload decode path, so TopicHandle.subscribe would "
289302
"fail. Pre-built Payload values can be passed to "
290303
"TopicHandle.publish on any-typed handle (zero-copy fast "
291-
"path); use type=typing.Any for heterogeneous topics, and "
292-
"subscribe via WorkflowStreamClient.subscribe with "
293-
"result_type=RawValue when raw payloads are needed."
304+
"path); omit type (or pass type=typing.Any) for "
305+
"heterogeneous topics, and subscribe via "
306+
"WorkflowStreamClient.subscribe with result_type=RawValue "
307+
"when raw payloads are needed."
294308
)
295309
existing = self._topic_types.get(name)
296-
if existing is not None and existing != type:
310+
if existing is not None and existing != bound:
297311
raise RuntimeError(
298312
f"Topic {name!r} is already bound to type {existing!r} on this "
299-
f"client; refusing to rebind to {type!r}. Use a single type "
300-
f"per topic, or pass type=typing.Any for heterogeneous topics."
313+
f"client; refusing to rebind to {bound!r}. Use a single type "
314+
f"per topic, or omit type (=typing.Any) for heterogeneous topics."
301315
)
302-
self._topic_types[name] = type
303-
return TopicHandle(self, name, type)
316+
self._topic_types[name] = bound
317+
return TopicHandle(self, name, bound)
304318

305319
async def flush(self) -> None:
306320
"""Flush buffered (and pending) items and wait for server confirmation.

temporalio/contrib/workflow_streams/_stream.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import sys
3030
from collections.abc import Sequence
3131
from datetime import timedelta
32-
from typing import Any, Callable, NoReturn, TypeVar
32+
from typing import Any, Callable, NoReturn, TypeVar, overload
3333

3434
from temporalio import workflow
3535
from temporalio.api.common.v1 import Payload
@@ -170,14 +170,21 @@ def _publish_to_topic(self, topic: str, value: Any) -> None:
170170
payload = workflow.payload_converter().to_payloads([value])[0]
171171
self._log.append(WorkflowStreamItem(topic=topic, data=payload))
172172

173-
def topic(self, name: str, *, type: type[T]) -> WorkflowTopicHandle[T]:
173+
@overload
174+
def topic(self, name: str) -> WorkflowTopicHandle[Any]: ...
175+
@overload
176+
def topic(self, name: str, *, type: type[T]) -> WorkflowTopicHandle[T]: ...
177+
178+
def topic(
179+
self, name: str, *, type: type[T] | None = None
180+
) -> WorkflowTopicHandle[T] | WorkflowTopicHandle[Any]:
174181
"""Return a typed handle for publishing to ``name`` from this workflow.
175182
176183
The handle records the topic name and value type so call sites
177184
do not have to repeat them. Each :class:`WorkflowStream`
178-
instance binds a topic name to exactly one ``T``: a second
179-
call with an unequal type raises ``RuntimeError``. Repeating
180-
the same call with the same type is idempotent and returns an
185+
instance binds a topic name to exactly one type: a second call
186+
with an unequal type raises ``RuntimeError``. Repeating the
187+
same call with the same type is idempotent and returns an
181188
equivalent handle.
182189
183190
Type uniformity is checked only on this stream instance — it
@@ -186,40 +193,44 @@ def topic(self, name: str, *, type: type[T]) -> WorkflowTopicHandle[T]:
186193
on the type object; subtype and union-superset relationships
187194
are not recognized.
188195
189-
For heterogeneous topics, pass ``type=typing.Any``. Pre-built
196+
Omitting ``type`` (or passing ``type=typing.Any``) is the
197+
documented escape hatch for heterogeneous topics. Pre-built
190198
``Payload`` values can be passed to
191199
:meth:`WorkflowTopicHandle.publish` regardless of the bound
192200
type (zero-copy fast path) — there is no need to bind the
193201
topic to ``Payload`` itself.
194202
195203
Args:
196204
name: Topic name.
197-
type: Value type bound to this handle.
205+
type: Value type bound to this handle. Defaults to
206+
``typing.Any`` (heterogeneous topic).
198207
199208
Returns:
200-
:class:`WorkflowTopicHandle` bound to ``name`` and ``type``.
209+
:class:`WorkflowTopicHandle` bound to ``name`` and the
210+
resolved type.
201211
202212
Raises:
203213
RuntimeError: If ``name`` is already bound on this stream
204214
to a different type.
205215
"""
206-
if type is Payload:
216+
bound: Any = Any if type is None else type
217+
if bound is Payload:
207218
raise RuntimeError(
208219
"Cannot bind a topic to type=Payload. Pre-built Payload "
209220
"values can be passed to WorkflowTopicHandle.publish on "
210-
"any-typed handle (zero-copy fast path); use "
211-
"type=typing.Any for heterogeneous topics."
221+
"any-typed handle (zero-copy fast path); omit type (or "
222+
"pass type=typing.Any) for heterogeneous topics."
212223
)
213224
existing = self._topic_types.get(name)
214-
if existing is not None and existing != type:
225+
if existing is not None and existing != bound:
215226
raise RuntimeError(
216227
f"Topic {name!r} is already bound to type {existing!r} on this "
217-
f"workflow stream; refusing to rebind to {type!r}. Use a "
218-
f"single type per topic, or pass type=typing.Any for "
228+
f"workflow stream; refusing to rebind to {bound!r}. Use a "
229+
f"single type per topic, or omit type (=typing.Any) for "
219230
f"heterogeneous topics."
220231
)
221-
self._topic_types[name] = type
222-
return WorkflowTopicHandle(self, name, type)
232+
self._topic_types[name] = bound
233+
return WorkflowTopicHandle(self, name, bound)
223234

224235
def get_state(
225236
self, *, publisher_ttl: timedelta = timedelta(seconds=900)

tests/contrib/workflow_streams/test_workflow_streams.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -656,12 +656,16 @@ async def test_topic_handle_client_uniqueness(client: Client) -> None:
656656
other = stream.topic("other", type=bytes)
657657
assert other.type is bytes
658658

659-
# Any escape hatch coexists on a different topic. ``Any`` is a
660-
# typing special form, not a class, so cast through ``object`` to
661-
# satisfy ``type[T]`` the runtime check uses Python equality and
662-
# accepts ``Any`` directly.
663-
raw = stream.topic("forwarded", type=cast(type[Any], cast(object, Any)))
659+
# Any escape hatch coexists on a different topic. Omitting ``type``
660+
# is the documented form (defaults to ``typing.Any``); we also
661+
# exercise the explicit ``type=Any`` path with the cast required
662+
# because ``Any`` is a typing special form rather than a class.
663+
raw = stream.topic("forwarded")
664664
assert raw.type is Any
665+
explicit = stream.topic(
666+
"forwarded-explicit", type=cast(type[Any], cast(object, Any))
667+
)
668+
assert explicit.type is Any
665669

666670
# Binding to Payload itself is rejected — subscribers would have
667671
# no decode path. Pre-built Payload values can still be published

0 commit comments

Comments
 (0)