Skip to content

Commit 883c7c7

Browse files
committed
Move nexus operation polling to an interceptable client method. Add in defaults for id_reuse_policy and id_conflict_policy. Update integration tests with better typing and new assertions for the newly interceptable get_nexus_operation_result
1 parent b9f0cf9 commit 883c7c7

2 files changed

Lines changed: 152 additions & 118 deletions

File tree

temporalio/client.py

Lines changed: 117 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -5332,9 +5332,8 @@ def __init__(
53325332
self._result_type = result_type
53335333
self._endpoint = endpoint
53345334
self._service = service
5335-
self._known_outcome: (
5336-
temporalio.api.common.v1.Payload | temporalio.api.failure.v1.Failure | None
5337-
) = None
5335+
# the default value is `_arg_unset` because ReturnType could be None
5336+
self._known_outcome: ReturnType | object = temporalio.common._arg_unset
53385337

53395338
@property
53405339
def operation_id(self) -> str:
@@ -5386,67 +5385,18 @@ async def result(
53865385
NexusOperationFailureError: If the operation completed with a failure.
53875386
RPCError: Operation result could not be fetched for some reason.
53885387
"""
5389-
if self._known_outcome is None:
5390-
self._known_outcome = await self._poll_until_outcome(
5391-
rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout
5392-
)
5393-
5394-
# Convert outcome to error or value
5395-
match self._known_outcome:
5396-
case temporalio.api.failure.v1.Failure():
5397-
raise await self._client.data_converter.decode_failure(
5398-
self._known_outcome
5399-
)
5400-
case temporalio.api.common.v1.Payload():
5401-
type_hints = [self._result_type] if self._result_type else None
5402-
[result] = await self._client.data_converter.decode(
5403-
[self._known_outcome], type_hints
5404-
)
5405-
return result
5406-
5407-
async def _poll_until_outcome(
5408-
self,
5409-
rpc_metadata: Mapping[str, str | bytes] = {},
5410-
rpc_timeout: timedelta | None = None,
5411-
) -> temporalio.api.common.v1.Payload | temporalio.api.failure.v1.Failure:
5412-
"""Poll for nexus operation result until it's available."""
5413-
req = temporalio.api.workflowservice.v1.PollNexusOperationExecutionRequest(
5414-
namespace=self._client.namespace,
5415-
operation_id=self._operation_id,
5416-
run_id=self._run_id or "",
5417-
wait_stage=temporalio.api.enums.v1.NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED,
5418-
)
5419-
5420-
# Continue polling as long as we have no outcome
5421-
while True:
5422-
try:
5423-
res = (
5424-
await self._client.workflow_service.poll_nexus_operation_execution(
5425-
req,
5426-
retry=True,
5427-
metadata=rpc_metadata,
5428-
timeout=rpc_timeout,
5429-
)
5388+
if self._known_outcome == temporalio.common._arg_unset:
5389+
self._known_outcome = await self._client._impl.get_nexus_operation_result(
5390+
GetNexusOperationResultInput(
5391+
operation_id=self._operation_id,
5392+
run_id=self._run_id,
5393+
result_type=self._result_type,
5394+
rpc_metadata=rpc_metadata,
5395+
rpc_timeout=rpc_timeout,
54305396
)
5431-
match res.WhichOneof("outcome"):
5432-
case "result":
5433-
return res.result
5397+
)
54345398

5435-
case "failure":
5436-
return res.failure
5437-
5438-
case None:
5439-
# poll again
5440-
pass
5441-
except RPCError as err:
5442-
match err.status:
5443-
case RPCStatusCode.DEADLINE_EXCEEDED:
5444-
# Deadline exceeded is expected with long polling; retry
5445-
continue
5446-
case RPCStatusCode.CANCELLED:
5447-
raise asyncio.CancelledError() from err
5448-
case _:
5449-
raise
5399+
return cast(ReturnType, self._known_outcome)
54505400

54515401
async def describe(
54525402
self,
@@ -5652,8 +5602,8 @@ async def start_operation(
56525602
arg: InputT,
56535603
*,
56545604
id: str,
5655-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5656-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5605+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5606+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
56575607
schedule_to_close_timeout: timedelta | None = None,
56585608
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
56595609
summary: str | None = None,
@@ -5670,8 +5620,8 @@ async def start_operation(
56705620
operation: nexusrpc.Operation[None, OutputT],
56715621
*,
56725622
id: str,
5673-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5674-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5623+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5624+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
56755625
schedule_to_close_timeout: timedelta | None = None,
56765626
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
56775627
summary: str | None = None,
@@ -5689,8 +5639,8 @@ async def start_operation(
56895639
arg: Any = temporalio.common._arg_unset,
56905640
*,
56915641
id: str,
5692-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5693-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5642+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5643+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
56945644
result_type: type[OutputT],
56955645
schedule_to_close_timeout: timedelta | None = None,
56965646
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -5709,8 +5659,8 @@ async def start_operation(
57095659
arg: Any = temporalio.common._arg_unset,
57105660
*,
57115661
id: str,
5712-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5713-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5662+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5663+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
57145664
schedule_to_close_timeout: timedelta | None = None,
57155665
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
57165666
summary: str | None = None,
@@ -5728,8 +5678,8 @@ async def start_operation(
57285678
arg: Any = temporalio.common._arg_unset,
57295679
*,
57305680
id: str,
5731-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5732-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5681+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5682+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
57335683
result_type: type[OutputT],
57345684
schedule_to_close_timeout: timedelta | None = None,
57355685
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -5748,8 +5698,8 @@ async def start_operation(
57485698
arg: Any = temporalio.common._arg_unset,
57495699
*,
57505700
id: str,
5751-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5752-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5701+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5702+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
57535703
schedule_to_close_timeout: timedelta | None = None,
57545704
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
57555705
summary: str | None = None,
@@ -5765,8 +5715,8 @@ async def start_operation(
57655715
arg: Any = temporalio.common._arg_unset,
57665716
*,
57675717
id: str,
5768-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5769-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5718+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5719+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
57705720
result_type: type | None = None,
57715721
schedule_to_close_timeout: timedelta | None = None,
57725722
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -5809,8 +5759,8 @@ async def execute_operation(
58095759
arg: InputT,
58105760
*,
58115761
id: str,
5812-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5813-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5762+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5763+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
58145764
schedule_to_close_timeout: timedelta | None = None,
58155765
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
58165766
summary: str | None = None,
@@ -5827,8 +5777,8 @@ async def execute_operation(
58275777
operation: nexusrpc.Operation[None, OutputT],
58285778
*,
58295779
id: str,
5830-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5831-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5780+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5781+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
58325782
schedule_to_close_timeout: timedelta | None = None,
58335783
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
58345784
summary: str | None = None,
@@ -5846,8 +5796,8 @@ async def execute_operation(
58465796
arg: Any = temporalio.common._arg_unset,
58475797
*,
58485798
id: str,
5849-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5850-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5799+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5800+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
58515801
result_type: type[OutputT],
58525802
schedule_to_close_timeout: timedelta | None = None,
58535803
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -5866,8 +5816,8 @@ async def execute_operation(
58665816
arg: Any = temporalio.common._arg_unset,
58675817
*,
58685818
id: str,
5869-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5870-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5819+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5820+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
58715821
schedule_to_close_timeout: timedelta | None = None,
58725822
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
58735823
summary: str | None = None,
@@ -5885,8 +5835,8 @@ async def execute_operation(
58855835
arg: Any = temporalio.common._arg_unset,
58865836
*,
58875837
id: str,
5888-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5889-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5838+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5839+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
58905840
result_type: type[OutputT],
58915841
schedule_to_close_timeout: timedelta | None = None,
58925842
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -5905,8 +5855,8 @@ async def execute_operation(
59055855
arg: Any = temporalio.common._arg_unset,
59065856
*,
59075857
id: str,
5908-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5909-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5858+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5859+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
59105860
schedule_to_close_timeout: timedelta | None = None,
59115861
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
59125862
summary: str | None = None,
@@ -5922,8 +5872,8 @@ async def execute_operation(
59225872
arg: Any = temporalio.common._arg_unset,
59235873
*,
59245874
id: str,
5925-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
5926-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5875+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5876+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
59275877
result_type: type | None = None,
59285878
schedule_to_close_timeout: timedelta | None = None,
59295879
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -6004,8 +5954,8 @@ async def start_operation(
60045954
arg: Any = temporalio.common._arg_unset,
60055955
*,
60065956
id: str,
6007-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
6008-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
5957+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
5958+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
60095959
result_type: type | None = None,
60105960
schedule_to_close_timeout: timedelta | None = None,
60115961
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -6047,8 +5997,8 @@ async def execute_operation(
60475997
arg: Any = temporalio.common._arg_unset,
60485998
*,
60495999
id: str,
6050-
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy,
6051-
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy,
6000+
id_reuse_policy: temporalio.common.NexusOperationIDReusePolicy = temporalio.common.NexusOperationIDReusePolicy.ALLOW_DUPLICATE,
6001+
id_conflict_policy: temporalio.common.NexusOperationIDConflictPolicy = temporalio.common.NexusOperationIDConflictPolicy.FAIL,
60526002
result_type: type | None = None,
60536003
schedule_to_close_timeout: timedelta | None = None,
60546004
search_attributes: temporalio.common.TypedSearchAttributes | None = None,
@@ -8785,6 +8735,21 @@ class DescribeNexusOperationInput:
87858735
rpc_timeout: timedelta | None
87868736

87878737

8738+
@dataclass
8739+
class GetNexusOperationResultInput:
8740+
"""Input for :py:meth:`OutbountInterceptor.get_nexus_operation_result`.
8741+
8742+
.. warning::
8743+
This API is experimental and unstable.
8744+
"""
8745+
8746+
operation_id: str
8747+
run_id: str | None
8748+
rpc_metadata: Mapping[str, str | bytes]
8749+
rpc_timeout: timedelta | None
8750+
result_type: type[Any] | None
8751+
8752+
87888753
@dataclass
87898754
class CancelNexusOperationInput:
87908755
"""Input for :py:meth:`OutboundInterceptor.cancel_nexus_operation`.
@@ -9256,6 +9221,16 @@ async def describe_nexus_operation(
92569221
"""
92579222
return await self.next.describe_nexus_operation(input)
92589223

9224+
async def get_nexus_operation_result(
9225+
self, input: GetNexusOperationResultInput
9226+
) -> Any:
9227+
"""Called for every :py:meth:`NexusOperationHandle.result` call.
9228+
9229+
.. warning::
9230+
This API is experimental and unstable.
9231+
"""
9232+
return await self.next.get_nexus_operation_result(input)
9233+
92599234
async def cancel_nexus_operation(self, input: CancelNexusOperationInput) -> None:
92609235
"""Called for every :py:meth:`NexusOperationHandle.cancel` call.
92619236
@@ -10060,6 +10035,54 @@ async def describe_nexus_operation(
1006010035
data_converter=self._client.data_converter,
1006110036
)
1006210037

10038+
async def get_nexus_operation_result(
10039+
self, input: GetNexusOperationResultInput
10040+
) -> Any:
10041+
"""Poll for nexus operation result until it's available."""
10042+
req = temporalio.api.workflowservice.v1.PollNexusOperationExecutionRequest(
10043+
namespace=self._client.namespace,
10044+
operation_id=input.operation_id,
10045+
run_id=input.run_id or "",
10046+
wait_stage=temporalio.api.enums.v1.NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED,
10047+
)
10048+
10049+
# Continue polling as long as we have no outcome
10050+
while True:
10051+
try:
10052+
res = (
10053+
await self._client.workflow_service.poll_nexus_operation_execution(
10054+
req,
10055+
retry=True,
10056+
metadata=input.rpc_metadata,
10057+
timeout=input.rpc_timeout,
10058+
)
10059+
)
10060+
match res.WhichOneof("outcome"):
10061+
case "result":
10062+
type_hints = [input.result_type] if input.result_type else None
10063+
[result] = await self._client.data_converter.decode(
10064+
[res.result], type_hints
10065+
)
10066+
return result
10067+
10068+
case "failure":
10069+
raise await self._client.data_converter.decode_failure(
10070+
res.failure
10071+
)
10072+
10073+
case None:
10074+
# poll again
10075+
pass
10076+
except RPCError as err:
10077+
match err.status:
10078+
case RPCStatusCode.DEADLINE_EXCEEDED:
10079+
# Deadline exceeded is expected with long polling; retry
10080+
continue
10081+
case RPCStatusCode.CANCELLED:
10082+
raise asyncio.CancelledError() from err
10083+
case _:
10084+
raise
10085+
1006310086
async def cancel_nexus_operation(self, input: CancelNexusOperationInput) -> None:
1006410087
"""Cancel a nexus operation."""
1006510088
await self._client.workflow_service.request_cancel_nexus_operation_execution(

0 commit comments

Comments
 (0)