Skip to content

Commit 2449502

Browse files
authored
Support conflict options for starting Nexus operations in test framework (#1828)
* Support conflict options for starting Nexus operations in test framework * fix nexus error handling * address comments * simplify test * address comments * Update cli version * fix test
1 parent b2b75c9 commit 2449502

File tree

6 files changed

+330
-32
lines changed

6 files changed

+330
-32
lines changed

internal/cmd/build/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (b *builder) integrationTest() error {
142142
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
143143
// TODO: Use stable release once server 1.27.0 is out.
144144
CachedDownload: testsuite.CachedDownload{
145-
Version: "v1.3.0-versioning.0",
145+
Version: "v1.3.0-rc.0",
146146
},
147147
ClientOptions: &client.Options{
148148
HostPort: "127.0.0.1:7233",

internal/internal_worker_base.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,13 @@ type (
232232
}
233233
)
234234

235+
func (h ResultHandler) wrap(callback ResultHandler) ResultHandler {
236+
return func(result *commonpb.Payloads, err error) {
237+
callback(result, err)
238+
h(result, err)
239+
}
240+
}
241+
235242
func (t *polledTask) getTask() taskForWorker {
236243
return t.task
237244
}

internal/internal_workflow.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,10 @@ type (
214214
WorkflowID string
215215
WaitForCancellation bool
216216
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy
217+
// WorkflowIDConflictPolicy and OnConflictOptions are only used in test environment for
218+
// running Nexus operations as child workflow.
217219
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
220+
OnConflictOptions *OnConflictOptions
218221
DataConverter converter.DataConverter
219222
RetryPolicy *commonpb.RetryPolicy
220223
CronSchedule string

internal/internal_workflow_testsuite.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,11 @@ func (env *testWorkflowEnvironmentImpl) setContinuedExecutionRunID(rid string) {
428428
env.workflowInfo.ContinuedExecutionRunID = rid
429429
}
430430

431-
func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) {
431+
func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(
432+
params *ExecuteWorkflowParams,
433+
callback ResultHandler,
434+
startedHandler func(r WorkflowExecution, e error),
435+
) (*testWorkflowEnvironmentImpl, error) {
432436
// create a new test env
433437
childEnv := newTestWorkflowEnvironmentImpl(env.testSuite, env.registry)
434438
childEnv.parentEnv = env
@@ -474,15 +478,27 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param
474478

475479
childEnv.runTimeout = params.WorkflowRunTimeout
476480
if workflowHandler, ok := env.runningWorkflows[params.WorkflowID]; ok {
481+
alreadyStartedErr := serviceerror.NewWorkflowExecutionAlreadyStarted(
482+
"Workflow execution already started",
483+
"",
484+
childEnv.workflowInfo.WorkflowExecution.RunID,
485+
)
477486
// duplicate workflow ID
478487
if !workflowHandler.handled {
479-
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
488+
if params.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING {
489+
if params.OnConflictOptions != nil && params.OnConflictOptions.AttachCompletionCallbacks {
490+
workflowHandler.callback = workflowHandler.callback.wrap(callback)
491+
}
492+
startedHandler(workflowHandler.env.workflowInfo.WorkflowExecution, nil)
493+
return nil, nil
494+
}
495+
return nil, alreadyStartedErr
480496
}
481497
if params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE {
482-
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
498+
return nil, alreadyStartedErr
483499
}
484500
if workflowHandler.err == nil && params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY {
485-
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
501+
return nil, alreadyStartedErr
486502
}
487503
}
488504

@@ -2380,16 +2396,20 @@ func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart
23802396
childEnv, err := env.newTestWorkflowEnvironmentForChild(&params, callback, startedHandler)
23812397
if err != nil {
23822398
env.logger.Info("ExecuteChildWorkflow failed", tagError, err)
2383-
callback(nil, err)
23842399
startedHandler(WorkflowExecution{}, err)
2400+
callback(nil, err)
23852401
return
23862402
}
23872403

2388-
env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name)
2389-
env.runningCount++
2404+
// childEnv can be nil when WorkflowIDConflictPolicy is USE_EXISTING and there's already a running
2405+
// workflow. This is only possible in the test environment for running Nexus handler workflow.
2406+
if childEnv != nil {
2407+
env.logger.Info("ExecuteChildWorkflow", tagWorkflowType, params.WorkflowType.Name)
2408+
env.runningCount++
23902409

2391-
// run child workflow in separate goroutinue
2392-
go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input)
2410+
// run child workflow in separate goroutinue
2411+
go childEnv.executeWorkflowInternal(delayStart, params.WorkflowType.Name, params.Input)
2412+
}
23932413
}
23942414

23952415
func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler(

internal/nexus_operations.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
453453
}
454454

455455
run := &testEnvWorkflowRunForNexusOperations{}
456+
startedErrCh := make(chan error, 1)
456457
doneCh := make(chan error)
457458

458459
var callback *commonpb.Callback
@@ -476,6 +477,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
476477
WorkflowTaskTimeout: options.WorkflowTaskTimeout,
477478
DataConverter: t.env.dataConverter,
478479
WorkflowIDReusePolicy: options.WorkflowIDReusePolicy,
480+
WorkflowIDConflictPolicy: options.WorkflowIDConflictPolicy,
481+
OnConflictOptions: options.onConflictOptions,
479482
ContextPropagators: t.env.contextPropagators,
480483
SearchAttributes: options.SearchAttributes,
481484
TypedSearchAttributes: options.TypedSearchAttributes,
@@ -485,6 +488,14 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
485488
RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy),
486489
},
487490
}, func(result *commonpb.Payloads, wfErr error) {
491+
// This callback handles async completion of Nexus operations. If there was an error when
492+
// starting the workflow, then the operation failed synchronously and this callback doesn't
493+
// need to be executed.
494+
startedErr := <-startedErrCh
495+
if startedErr != nil {
496+
return
497+
}
498+
488499
ncb := callback.GetNexus()
489500
if ncb == nil {
490501
return
@@ -519,6 +530,8 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
519530
}
520531
}, func(r WorkflowExecution, err error) {
521532
run.WorkflowExecution = r
533+
startedErrCh <- err
534+
close(startedErrCh)
522535
doneCh <- err
523536
})
524537
}, false)

0 commit comments

Comments
 (0)