@@ -287,10 +287,15 @@ async def run(
287
287
headers = op_input .start_options .headers ,
288
288
)
289
289
self ._nexus_operation_started = True
290
- if isinstance (input .op_input .response_type , SyncResponse ):
291
- assert op_handle .operation_token is None
292
- else :
293
- assert op_handle .operation_token
290
+ if not input .op_input .response_type .exception_in_operation_start :
291
+ if isinstance (input .op_input .response_type , SyncResponse ):
292
+ assert (
293
+ op_handle .operation_token is None
294
+ ), "operation_token should be absent after a sync response"
295
+ else :
296
+ assert (
297
+ op_handle .operation_token
298
+ ), "operation_token should be present after an async response"
294
299
295
300
if request_cancel :
296
301
# Even for SyncResponse, the op_handle future is not done at this point; that
@@ -561,19 +566,6 @@ async def test_async_response(
561
566
op_definition_type ,
562
567
caller_reference ,
563
568
)
564
- # TODO(dan): race here? How do we know it hasn't been canceled already?
565
- handler_wf_info = await handler_wf_handle .describe ()
566
- assert handler_wf_info .status in [
567
- WorkflowExecutionStatus .RUNNING ,
568
- WorkflowExecutionStatus .COMPLETED ,
569
- ]
570
- await assert_caller_workflow_has_link_to_handler_workflow (
571
- caller_wf_handle , handler_wf_handle , handler_wf_info .run_id
572
- )
573
- await assert_handler_workflow_has_link_to_caller_workflow (
574
- caller_wf_handle , handler_wf_handle
575
- )
576
-
577
569
if exception_in_operation_start :
578
570
with pytest .raises (WorkflowFailureError ) as ei :
579
571
await caller_wf_handle .result ()
@@ -590,7 +582,22 @@ async def test_async_response(
590
582
if op_definition_type == OpDefinitionType .SHORTHAND
591
583
else "sync_or_async_operation"
592
584
)
593
- elif request_cancel :
585
+ return
586
+
587
+ # TODO(dan): race here? How do we know it hasn't been canceled already?
588
+ handler_wf_info = await handler_wf_handle .describe ()
589
+ assert handler_wf_info .status in [
590
+ WorkflowExecutionStatus .RUNNING ,
591
+ WorkflowExecutionStatus .COMPLETED ,
592
+ ]
593
+ await assert_caller_workflow_has_link_to_handler_workflow (
594
+ caller_wf_handle , handler_wf_handle , handler_wf_info .run_id
595
+ )
596
+ await assert_handler_workflow_has_link_to_caller_workflow (
597
+ caller_wf_handle , handler_wf_handle
598
+ )
599
+
600
+ if request_cancel :
594
601
# The operation response was asynchronous and so request_cancel is honored. See
595
602
# explanation below.
596
603
with pytest .raises (WorkflowFailureError ) as ei :
@@ -638,7 +645,7 @@ async def _start_wf_and_nexus_op(
638
645
"""
639
646
await create_nexus_endpoint (task_queue , client )
640
647
# TODO(dan): this workflow ID is for xray; change to uuid before merge
641
- operation_workflow_id = "default-workflow-id"
648
+ operation_workflow_id = str ( uuid . uuid4 ())
642
649
643
650
# Start the caller workflow and wait until it confirms the Nexus operation has started.
644
651
block_forever_waiting_for_cancellation = request_cancel
0 commit comments