Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chasm/lib/nexusoperation/operation_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (h *operationInvocationTaskHandler) Execute(
return fmt.Errorf("failed to build callback URL: %w", err)
}

token, err := h.generateCallbackToken(args.serializedRef, args.requestID)
token, err := h.generateCallbackToken(args.serializedRef, opRef.NamespaceID, opRef.BusinessID, opRef.RunID, args.requestID)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion chasm/lib/nexusoperation/task_handler_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,20 @@ func lookupEndpoint(ctx context.Context, registry commonnexus.EndpointRegistry,
}

// generateCallbackToken creates a callback token for the given operation reference.
//
// namespaceID, businessID, and runID are dual-populated alongside ComponentRef so
// legacy HSM-only callback routers can still route the completion. ComponentRef
// remains authoritative for completion handling; the HSM-shaped fields are
// consulted only by routers, never for completion semantics.
func (h *operationInvocationTaskHandler) generateCallbackToken(
serializedRef []byte,
requestID string,
namespaceID, businessID, runID, requestID string,
) (string, error) {
token, err := h.callbackTokenGenerator.Tokenize(&tokenspb.NexusOperationCompletion{
ComponentRef: serializedRef,
NamespaceId: namespaceID,
WorkflowId: businessID,
RunId: runID,
Copy link
Copy Markdown
Contributor Author

@stephanos stephanos May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunId is not strictly needed, but still setting it for consistency.

RequestId: requestID,
})
if err != nil {
Expand Down
20 changes: 7 additions & 13 deletions common/nexus/callback_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,19 @@ func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokens

func validateCompletion(completion *tokenspb.NexusOperationCompletion) error {
hasCHASMRef := len(completion.GetComponentRef()) > 0
hasHSMRef := completion.GetNamespaceId() != "" ||
completion.GetWorkflowId() != "" ||
completion.GetRunId() != "" ||
completion.GetRef() != nil
isCompleteHSM := completion.GetNamespaceId() != "" &&
hasHSMRef := completion.GetNamespaceId() != "" &&
completion.GetWorkflowId() != "" &&
completion.GetRunId() != "" &&
completion.GetRef() != nil

switch {
case hasCHASMRef && hasHSMRef:
return serviceerror.NewInvalidArgument("callback token contains both HSM and CHASM fields")
case hasCHASMRef:
return nil
case isCompleteHSM:
// CHASM tokens may also carry partial HSM-shaped routing fields
// (NamespaceId/WorkflowId) so that legacy HSM-only callback routers
// can still route the completion. The CHASM ComponentRef remains
// authoritative; the HSM fields are routing hints only.
if hasCHASMRef || hasHSMRef {
return nil
default:
return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref")
}
return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref")
}

// DecodeCallbackToken unmarshals the given token applying minimal data verification.
Expand Down
24 changes: 5 additions & 19 deletions common/nexus/callback_token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,22 @@ func TestCallbackTokenGenerator_DecodeCompletion(t *testing.T) {
},
},
{
name: "mixed with namespace id",
// CHASM tokens may carry HSM-shaped routing hints alongside ComponentRef
// so legacy HSM-only callback routers can still route the completion.
name: "CHASM with HSM routing hints",
completion: &tokenspb.NexusOperationCompletion{
NamespaceId: "ns-id",
ComponentRef: []byte("component-ref"),
},
wantErr: "both HSM and CHASM",
},
{
name: "mixed with workflow id",
completion: &tokenspb.NexusOperationCompletion{
WorkflowId: "wf-id",
ComponentRef: []byte("component-ref"),
},
wantErr: "both HSM and CHASM",
},
{
name: "mixed with run id",
completion: &tokenspb.NexusOperationCompletion{
RunId: "run-id",
ComponentRef: []byte("component-ref"),
},
wantErr: "both HSM and CHASM",
},
{
name: "mixed with ref",
name: "CHASM with partial HSM routing hints",
completion: &tokenspb.NexusOperationCompletion{
Ref: &persistencespb.StateMachineRef{},
NamespaceId: "ns-id",
ComponentRef: []byte("component-ref"),
},
wantErr: "both HSM and CHASM",
},
{
name: "empty",
Expand Down
51 changes: 51 additions & 0 deletions tests/nexus_standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/nexus-rpc/sdk-go/nexus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,6 +20,10 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporalnexus"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm/lib/nexusoperation"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -2123,6 +2128,52 @@ func (s *NexusStandaloneTestSuite) TestStandaloneNexusOperationPoll() {
})
}

// Verifies that CHASM-backed Standalone Nexus is compatible with HSM-backed callbacks.
func (s *NexusStandaloneTestSuite) TestHSMCallbackHandlerCompatability() {
env := s.newTestEnv(testcore.WithDynamicConfig(dynamicconfig.EnableCHASMCallbacks, false))
ctx := env.Context()

handlerTaskQueue := testcore.RandomizeStr(s.T().Name())
endpointName := env.createNexusEndpoint(ctx, s.T(), testcore.RandomizedNexusEndpoint(s.T().Name()), handlerTaskQueue).GetSpec().GetName()

handlerWF := func(workflow.Context, nexus.NoValue) (string, error) {
return "ok", nil
}

svc := nexus.NewService("test-service")
nexusOp := temporalnexus.NewWorkflowRunOperation("test-operation", handlerWF,
func(_ context.Context, _ nexus.NoValue, _ nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
ID: "handler-wf-" + uuid.NewString(),
TaskQueue: handlerTaskQueue,
}, nil
})
svc.MustRegister(nexusOp)

w := worker.New(env.SdkClient(), handlerTaskQueue, worker.Options{})
w.RegisterWorkflow(handlerWF)
w.RegisterNexusService(svc)
s.NoError(w.Start())
defer w.Stop()

startResp, err := s.startNexusOperation(env, &workflowservice.StartNexusOperationExecutionRequest{
OperationId: "test-op",
Endpoint: endpointName,
})
s.NoError(err)

s.EventuallyWithT(func(t *assert.CollectT) {
descResp, err := env.FrontendClient().DescribeNexusOperationExecution(ctx, &workflowservice.DescribeNexusOperationExecutionRequest{
Namespace: env.Namespace().String(),
OperationId: "test-op",
RunId: startResp.RunId,
IncludeOutcome: true,
})
require.NoError(t, err)
require.Equal(t, enumspb.NEXUS_OPERATION_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus())
}, 30*time.Second, 200*time.Millisecond)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ repro for issue

func (s *NexusStandaloneTestSuite) TestAsyncCompletionIgnoresTransitionFieldsInCallbackToken() {
env := s.newTestEnv()
handlerLink := &commonpb.Link_WorkflowEvent{
Expand Down
Loading