Skip to content

Commit 6ddd638

Browse files
committed
Standalone Nexus support
1 parent 1747d53 commit 6ddd638

15 files changed

Lines changed: 3281 additions & 1633 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/stretchr/testify v1.11.1
1515
github.com/temporalio/features v0.0.0-20260331150122-757294c2a9e9
1616
github.com/temporalio/omes/workers/go/projects/harness/api v0.0.0-00010101000000-000000000000
17-
go.temporal.io/api v1.62.7
17+
go.temporal.io/api v1.62.11
1818
go.temporal.io/sdk v1.42.0
1919
go.uber.org/zap v1.27.0
2020
golang.org/x/mod v0.31.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W
120120
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
121121
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
122122
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
123-
go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o=
124-
go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
123+
go.temporal.io/api v1.62.11 h1:MWDaooDvOJCIRb1atqeZX2ErDPNTsNc3/mMEVEvvaVU=
124+
go.temporal.io/api v1.62.11/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
125125
go.temporal.io/sdk v1.42.0 h1:2Zyrj1PZFd1xQVrrXF6RlE1nHZzZRuWfSyC2TqT3ri8=
126126
go.temporal.io/sdk v1.42.0/go.mod h1:Xp4TMHsie6kdw0lc0Ae4o8vktze5HZXBynF2DkiXcrQ=
127127
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

loadgen/kitchen_sink_executor_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,31 @@ func TestKitchenSink(t *testing.T) {
969969
WorkflowExecutionCompleted`),
970970
expectedUnsupportedErrs: nexusUnsupportedSDKs,
971971
},
972+
{
973+
name: "ExecActivity/Client/StandaloneNexusOperation",
974+
testInput: &TestInput{
975+
WorkflowInput: &WorkflowInput{
976+
InitialActions: ListActionSet(
977+
ClientActivity(
978+
ClientActions(&ClientAction{
979+
Variant: &ClientAction_DoStandaloneNexusOperation{
980+
DoStandaloneNexusOperation: &DoStandaloneNexusOperation{
981+
// Endpoint filled by PrepareTestInput
982+
Service: "kitchen-sink",
983+
Operation: "echo-async",
984+
},
985+
},
986+
}),
987+
DefaultRemoteActivity,
988+
),
989+
),
990+
},
991+
},
992+
historyMatcher: PartialHistoryMatcher(`
993+
ActivityTaskScheduled {"activityType":{"name":"client"}}
994+
ActivityTaskStarted
995+
ActivityTaskCompleted`),
996+
},
972997
{
973998
name: "UnsupportedAction",
974999
testInput: &TestInput{
@@ -1044,6 +1069,15 @@ func testForSDK(
10441069
if nexusOp := action.GetNexusOperation(); nexusOp != nil && nexusOp.Endpoint == "" {
10451070
nexusOp.Endpoint = nexusEndpoint
10461071
}
1072+
if clientSeq := action.GetExecActivity().GetClient().GetClientSequence(); clientSeq != nil {
1073+
for _, cas := range clientSeq.ActionSets {
1074+
for _, ca := range cas.Actions {
1075+
if sno := ca.GetDoStandaloneNexusOperation(); sno != nil && sno.Endpoint == "" {
1076+
sno.Endpoint = nexusEndpoint
1077+
}
1078+
}
1079+
}
1080+
}
10471081
}
10481082
}
10491083
}

loadgen/kitchensink/client_action_executor.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77
"time"
88

99
enumspb "go.temporal.io/api/enums/v1"
10+
workflowservicepb "go.temporal.io/api/workflowservice/v1"
1011
"go.temporal.io/sdk/client"
1112
"go.temporal.io/sdk/workflow"
1213
"golang.org/x/sync/errgroup"
1314
)
1415

1516
type ClientActionsExecutor struct {
1617
Client client.Client
18+
Namespace string
1719
WorkflowOptions client.StartWorkflowOptions
1820
WorkflowType string
1921
WorkflowInput *WorkflowInput
@@ -127,6 +129,8 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
127129
} else if action.GetNestedActions() != nil {
128130
err = e.executeClientActionSet(ctx, action.GetNestedActions())
129131
return err
132+
} else if sano := action.GetDoStandaloneNexusOperation(); sano != nil {
133+
return e.executeStandaloneNexusOperation(ctx, sano)
130134
} else {
131135
return fmt.Errorf("client action must be set")
132136
}
@@ -196,3 +200,32 @@ func (e *ClientActionsExecutor) executeUpdateAction(ctx context.Context, upd *Do
196200
}
197201
return run, err
198202
}
203+
204+
func (e *ClientActionsExecutor) executeStandaloneNexusOperation(ctx context.Context, sno *DoStandaloneNexusOperation) error {
205+
// Use a unique operation ID scoped to the workflow so parallel iterations don't collide.
206+
operationID := fmt.Sprintf("standalone-nexus-%s", e.WorkflowOptions.ID)
207+
_, err := e.Client.WorkflowService().StartNexusOperationExecution(ctx,
208+
&workflowservicepb.StartNexusOperationExecutionRequest{
209+
Namespace: e.Namespace,
210+
OperationId: operationID,
211+
Endpoint: sno.Endpoint,
212+
Service: sno.Service,
213+
Operation: sno.Operation,
214+
})
215+
if err != nil {
216+
return fmt.Errorf("StartNexusOperationExecution: %w", err)
217+
}
218+
pollResp, err := e.Client.WorkflowService().PollNexusOperationExecution(ctx,
219+
&workflowservicepb.PollNexusOperationExecutionRequest{
220+
Namespace: e.Namespace,
221+
OperationId: operationID,
222+
WaitStage: enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED,
223+
})
224+
if err != nil {
225+
return fmt.Errorf("PollNexusOperationExecution: %w", err)
226+
}
227+
if failure := pollResp.GetFailure(); failure != nil {
228+
return fmt.Errorf("standalone nexus operation failed: %s", failure.GetMessage())
229+
}
230+
return nil
231+
}

0 commit comments

Comments
 (0)