Skip to content

Commit a83389c

Browse files
committed
Dual-populate HSM routing fields in CHASM callback tokens
1 parent a4d105e commit a83389c

5 files changed

Lines changed: 78 additions & 26 deletions

File tree

chasm/lib/callback/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func routeSystemCallbackRequest(
4545
}
4646

4747
// Normalize to support two possible token shapes:
48-
// - legacy HSM tokens carry namespace/workflow IDs directly
48+
// - HSM tokens carry namespace/workflow IDs directly
4949
// - CHASM tokens carry an encoded component ref instead
5050
namespaceID := completion.GetNamespaceId()
5151
businessID := completion.GetWorkflowId()

chasm/lib/nexusoperation/task_handler_helpers.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,19 @@ func (h *operationInvocationTaskHandler) generateCallbackToken(
396396
serializedRef []byte,
397397
requestID string,
398398
) (string, error) {
399+
// Dual-populate HSM-shaped routing fields alongside ComponentRef so that
400+
// legacy HSM-only callback routers (in older worker clusters) can still route
401+
// the completion. ComponentRef remains authoritative for completion handling;
402+
// these fields are only consulted by routers, never for completion semantics.
403+
ref := &persistencespb.ChasmComponentRef{}
404+
if err := ref.Unmarshal(serializedRef); err != nil {
405+
return "", fmt.Errorf("%w: %w", queueserrors.NewUnprocessableTaskError("invalid component ref"), err)
406+
}
399407
token, err := h.callbackTokenGenerator.Tokenize(&tokenspb.NexusOperationCompletion{
400408
ComponentRef: serializedRef,
409+
NamespaceId: ref.GetNamespaceId(),
410+
WorkflowId: ref.GetBusinessId(),
411+
RunId: ref.GetRunId(),
401412
RequestId: requestID,
402413
})
403414
if err != nil {

common/nexus/callback_token.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,17 @@ func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokens
7070

7171
func validateCompletion(completion *tokenspb.NexusOperationCompletion) error {
7272
hasCHASMRef := len(completion.GetComponentRef()) > 0
73-
hasHSMRef := completion.GetNamespaceId() != "" ||
74-
completion.GetWorkflowId() != "" ||
75-
completion.GetRunId() != "" ||
76-
completion.GetRef() != nil
7773
isCompleteHSM := completion.GetNamespaceId() != "" &&
7874
completion.GetWorkflowId() != "" &&
7975
completion.GetRunId() != "" &&
8076
completion.GetRef() != nil
8177

8278
switch {
83-
case hasCHASMRef && hasHSMRef:
84-
return serviceerror.NewInvalidArgument("callback token contains both HSM and CHASM fields")
8579
case hasCHASMRef:
80+
// CHASM tokens may also carry partial HSM-shaped routing fields
81+
// (NamespaceId/WorkflowId/RunId) so that legacy HSM-only callback routers
82+
// can still route the completion. The CHASM ComponentRef remains
83+
// authoritative; the HSM fields are routing hints only.
8684
return nil
8785
case isCompleteHSM:
8886
return nil

common/nexus/callback_token_test.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,36 +33,22 @@ func TestCallbackTokenGenerator_DecodeCompletion(t *testing.T) {
3333
},
3434
},
3535
{
36-
name: "mixed with namespace id",
36+
// CHASM tokens may carry HSM-shaped routing hints alongside ComponentRef
37+
// so legacy HSM-only callback routers can still route the completion.
38+
name: "CHASM with HSM routing hints",
3739
completion: &tokenspb.NexusOperationCompletion{
3840
NamespaceId: "ns-id",
39-
ComponentRef: []byte("component-ref"),
40-
},
41-
wantErr: "both HSM and CHASM",
42-
},
43-
{
44-
name: "mixed with workflow id",
45-
completion: &tokenspb.NexusOperationCompletion{
4641
WorkflowId: "wf-id",
47-
ComponentRef: []byte("component-ref"),
48-
},
49-
wantErr: "both HSM and CHASM",
50-
},
51-
{
52-
name: "mixed with run id",
53-
completion: &tokenspb.NexusOperationCompletion{
5442
RunId: "run-id",
5543
ComponentRef: []byte("component-ref"),
5644
},
57-
wantErr: "both HSM and CHASM",
5845
},
5946
{
60-
name: "mixed with ref",
47+
name: "CHASM with partial HSM routing hints",
6148
completion: &tokenspb.NexusOperationCompletion{
62-
Ref: &persistencespb.StateMachineRef{},
49+
NamespaceId: "ns-id",
6350
ComponentRef: []byte("component-ref"),
6451
},
65-
wantErr: "both HSM and CHASM",
6652
},
6753
{
6854
name: "empty",

tests/nexus_standalone_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/google/uuid"
1112
"github.com/nexus-rpc/sdk-go/nexus"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
@@ -19,6 +20,10 @@ import (
1920
"go.temporal.io/api/serviceerror"
2021
taskqueuepb "go.temporal.io/api/taskqueue/v1"
2122
"go.temporal.io/api/workflowservice/v1"
23+
"go.temporal.io/sdk/client"
24+
"go.temporal.io/sdk/temporalnexus"
25+
"go.temporal.io/sdk/worker"
26+
"go.temporal.io/sdk/workflow"
2227
persistencespb "go.temporal.io/server/api/persistence/v1"
2328
"go.temporal.io/server/chasm/lib/nexusoperation"
2429
"go.temporal.io/server/common/dynamicconfig"
@@ -2123,6 +2128,58 @@ func (s *NexusStandaloneTestSuite) TestStandaloneNexusOperationPoll() {
21232128
})
21242129
}
21252130

2131+
// TestAsyncCompletionAgainstWorkflowHandler exercises the full end-to-end completion path
2132+
// for a standalone (CHASM) Nexus operation whose handler is an SDK workflow registered via
2133+
// temporalnexus.NewWorkflowRunOperation against a Worker-target endpoint. With
2134+
// EnableCHASMCallbacks defaulted to false, the handler workflow's completion is delivered
2135+
// through the legacy HSM callback machine, so this test covers the cross-stack combination
2136+
// that previously failed with `NamespaceID is empty.` before the callback token began
2137+
// dual-populating HSM-shaped routing fields.
2138+
func (s *NexusStandaloneTestSuite) TestAsyncCompletionAgainstWorkflowHandler() {
2139+
env := s.newTestEnv()
2140+
ctx := env.Context()
2141+
2142+
handlerTaskQueue := testcore.RandomizeStr(s.T().Name())
2143+
endpointName := env.createNexusEndpoint(ctx, s.T(), testcore.RandomizedNexusEndpoint(s.T().Name()), handlerTaskQueue).GetSpec().GetName()
2144+
2145+
handlerWF := func(workflow.Context, nexus.NoValue) (string, error) {
2146+
return "ok", nil
2147+
}
2148+
2149+
svc := nexus.NewService("test-service")
2150+
nexusOp := temporalnexus.NewWorkflowRunOperation("test-operation", handlerWF,
2151+
func(_ context.Context, _ nexus.NoValue, _ nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
2152+
return client.StartWorkflowOptions{
2153+
ID: "handler-wf-" + uuid.NewString(),
2154+
TaskQueue: handlerTaskQueue,
2155+
}, nil
2156+
})
2157+
svc.MustRegister(nexusOp)
2158+
2159+
w := worker.New(env.SdkClient(), handlerTaskQueue, worker.Options{})
2160+
w.RegisterWorkflow(handlerWF)
2161+
w.RegisterNexusService(svc)
2162+
s.NoError(w.Start())
2163+
defer w.Stop()
2164+
2165+
startResp, err := s.startNexusOperation(env, &workflowservice.StartNexusOperationExecutionRequest{
2166+
OperationId: "test-op",
2167+
Endpoint: endpointName,
2168+
})
2169+
s.NoError(err)
2170+
2171+
s.EventuallyWithT(func(t *assert.CollectT) {
2172+
descResp, err := env.FrontendClient().DescribeNexusOperationExecution(ctx, &workflowservice.DescribeNexusOperationExecutionRequest{
2173+
Namespace: env.Namespace().String(),
2174+
OperationId: "test-op",
2175+
RunId: startResp.RunId,
2176+
IncludeOutcome: true,
2177+
})
2178+
require.NoError(t, err)
2179+
require.Equal(t, enumspb.NEXUS_OPERATION_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus())
2180+
}, 30*time.Second, 200*time.Millisecond)
2181+
}
2182+
21262183
func (s *NexusStandaloneTestSuite) TestAsyncCompletionIgnoresTransitionFieldsInCallbackToken() {
21272184
env := s.newTestEnv()
21282185
handlerLink := &commonpb.Link_WorkflowEvent{

0 commit comments

Comments
 (0)