Skip to content

Commit 2749db6

Browse files
committed
Dual-populate HSM routing fields in CHASM callback tokens
1 parent a4d105e commit 2749db6

5 files changed

Lines changed: 72 additions & 34 deletions

File tree

chasm/lib/nexusoperation/operation_tasks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (h *operationInvocationTaskHandler) Execute(
132132
return fmt.Errorf("failed to build callback URL: %w", err)
133133
}
134134

135-
token, err := h.generateCallbackToken(args.serializedRef, args.requestID)
135+
token, err := h.generateCallbackToken(args.serializedRef, opRef.NamespaceID, opRef.BusinessID, args.requestID)
136136
if err != nil {
137137
return err
138138
}

chasm/lib/nexusoperation/task_handler_helpers.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,12 +392,19 @@ func lookupEndpoint(ctx context.Context, registry commonnexus.EndpointRegistry,
392392
}
393393

394394
// generateCallbackToken creates a callback token for the given operation reference.
395+
//
396+
// namespaceID and businessID are dual-populated alongside ComponentRef so legacy
397+
// HSM-only callback routers can still route the completion. ComponentRef remains
398+
// authoritative for completion handling; the HSM-shaped fields are consulted only
399+
// by routers, never for completion semantics.
395400
func (h *operationInvocationTaskHandler) generateCallbackToken(
396401
serializedRef []byte,
397-
requestID string,
402+
namespaceID, businessID, requestID string,
398403
) (string, error) {
399404
token, err := h.callbackTokenGenerator.Tokenize(&tokenspb.NexusOperationCompletion{
400405
ComponentRef: serializedRef,
406+
NamespaceId: namespaceID,
407+
WorkflowId: businessID,
401408
RequestId: requestID,
402409
})
403410
if err != nil {

common/nexus/callback_token.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,25 +70,19 @@ func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokens
7070

7171
func validateCompletion(completion *tokenspb.NexusOperationCompletion) error {
7272
hasCHASMRef := len(completion.GetComponentRef()) > 0
73-
hasHSMRef := completion.GetNamespaceId() != "" ||
74-
completion.GetWorkflowId() != "" ||
75-
completion.GetRunId() != "" ||
76-
completion.GetRef() != nil
77-
isCompleteHSM := completion.GetNamespaceId() != "" &&
73+
hasHSMRef := completion.GetNamespaceId() != "" &&
7874
completion.GetWorkflowId() != "" &&
7975
completion.GetRunId() != "" &&
8076
completion.GetRef() != nil
8177

82-
switch {
83-
case hasCHASMRef && hasHSMRef:
84-
return serviceerror.NewInvalidArgument("callback token contains both HSM and CHASM fields")
85-
case hasCHASMRef:
86-
return nil
87-
case isCompleteHSM:
78+
// CHASM tokens may also carry partial HSM-shaped routing fields
79+
// (NamespaceId/WorkflowId) so that legacy HSM-only callback routers
80+
// can still route the completion. The CHASM ComponentRef remains
81+
// authoritative; the HSM fields are routing hints only.
82+
if hasCHASMRef || hasHSMRef {
8883
return nil
89-
default:
90-
return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref")
9184
}
85+
return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref")
9286
}
9387

9488
// DecodeCallbackToken unmarshals the given token applying minimal data verification.

common/nexus/callback_token_test.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,36 +33,22 @@ func TestCallbackTokenGenerator_DecodeCompletion(t *testing.T) {
3333
},
3434
},
3535
{
36-
name: "mixed with namespace id",
36+
// CHASM tokens may carry HSM-shaped routing hints alongside ComponentRef
37+
// so legacy HSM-only callback routers can still route the completion.
38+
name: "CHASM with HSM routing hints",
3739
completion: &tokenspb.NexusOperationCompletion{
3840
NamespaceId: "ns-id",
39-
ComponentRef: []byte("component-ref"),
40-
},
41-
wantErr: "both HSM and CHASM",
42-
},
43-
{
44-
name: "mixed with workflow id",
45-
completion: &tokenspb.NexusOperationCompletion{
4641
WorkflowId: "wf-id",
47-
ComponentRef: []byte("component-ref"),
48-
},
49-
wantErr: "both HSM and CHASM",
50-
},
51-
{
52-
name: "mixed with run id",
53-
completion: &tokenspb.NexusOperationCompletion{
5442
RunId: "run-id",
5543
ComponentRef: []byte("component-ref"),
5644
},
57-
wantErr: "both HSM and CHASM",
5845
},
5946
{
60-
name: "mixed with ref",
47+
name: "CHASM with partial HSM routing hints",
6148
completion: &tokenspb.NexusOperationCompletion{
62-
Ref: &persistencespb.StateMachineRef{},
49+
NamespaceId: "ns-id",
6350
ComponentRef: []byte("component-ref"),
6451
},
65-
wantErr: "both HSM and CHASM",
6652
},
6753
{
6854
name: "empty",

tests/nexus_standalone_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/google/uuid"
1112
"github.com/nexus-rpc/sdk-go/nexus"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
@@ -19,6 +20,10 @@ import (
1920
"go.temporal.io/api/serviceerror"
2021
taskqueuepb "go.temporal.io/api/taskqueue/v1"
2122
"go.temporal.io/api/workflowservice/v1"
23+
"go.temporal.io/sdk/client"
24+
"go.temporal.io/sdk/temporalnexus"
25+
"go.temporal.io/sdk/worker"
26+
"go.temporal.io/sdk/workflow"
2227
persistencespb "go.temporal.io/server/api/persistence/v1"
2328
"go.temporal.io/server/chasm/lib/nexusoperation"
2429
"go.temporal.io/server/common/dynamicconfig"
@@ -2123,6 +2128,52 @@ func (s *NexusStandaloneTestSuite) TestStandaloneNexusOperationPoll() {
21232128
})
21242129
}
21252130

2131+
// Verifies that CHASM-backed Standalone Nexus is compatible with HSM-backed callbacks.
2132+
func (s *NexusStandaloneTestSuite) TestAsyncCompletionAgainstWorkflowHandler() {
2133+
env := s.newTestEnv(testcore.WithDynamicConfig(dynamicconfig.EnableCHASMCallbacks, false))
2134+
ctx := env.Context()
2135+
2136+
handlerTaskQueue := testcore.RandomizeStr(s.T().Name())
2137+
endpointName := env.createNexusEndpoint(ctx, s.T(), testcore.RandomizedNexusEndpoint(s.T().Name()), handlerTaskQueue).GetSpec().GetName()
2138+
2139+
handlerWF := func(workflow.Context, nexus.NoValue) (string, error) {
2140+
return "ok", nil
2141+
}
2142+
2143+
svc := nexus.NewService("test-service")
2144+
nexusOp := temporalnexus.NewWorkflowRunOperation("test-operation", handlerWF,
2145+
func(_ context.Context, _ nexus.NoValue, _ nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
2146+
return client.StartWorkflowOptions{
2147+
ID: "handler-wf-" + uuid.NewString(),
2148+
TaskQueue: handlerTaskQueue,
2149+
}, nil
2150+
})
2151+
svc.MustRegister(nexusOp)
2152+
2153+
w := worker.New(env.SdkClient(), handlerTaskQueue, worker.Options{})
2154+
w.RegisterWorkflow(handlerWF)
2155+
w.RegisterNexusService(svc)
2156+
s.NoError(w.Start())
2157+
defer w.Stop()
2158+
2159+
startResp, err := s.startNexusOperation(env, &workflowservice.StartNexusOperationExecutionRequest{
2160+
OperationId: "test-op",
2161+
Endpoint: endpointName,
2162+
})
2163+
s.NoError(err)
2164+
2165+
s.EventuallyWithT(func(t *assert.CollectT) {
2166+
descResp, err := env.FrontendClient().DescribeNexusOperationExecution(ctx, &workflowservice.DescribeNexusOperationExecutionRequest{
2167+
Namespace: env.Namespace().String(),
2168+
OperationId: "test-op",
2169+
RunId: startResp.RunId,
2170+
IncludeOutcome: true,
2171+
})
2172+
require.NoError(t, err)
2173+
require.Equal(t, enumspb.NEXUS_OPERATION_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus())
2174+
}, 30*time.Second, 200*time.Millisecond)
2175+
}
2176+
21262177
func (s *NexusStandaloneTestSuite) TestAsyncCompletionIgnoresTransitionFieldsInCallbackToken() {
21272178
env := s.newTestEnv()
21282179
handlerLink := &commonpb.Link_WorkflowEvent{

0 commit comments

Comments
 (0)