Skip to content

Commit bae0515

Browse files
authored
Make Nexus work OOTB with token-based callback routing (temporalio#9513)
## Summary - Use the system callback URL by default now that the previous minor supports it (1.30) - Extract system callback URL handling from `routeRequest` into a dedicated `routeSystemCallbackRequest` function that routes based on callback token namespace and active cluster - Update both `components/callbacks` and `chasm/lib/callback` packages to use namespace registry and callback token generator for routing decisions - Add unit tests for `routeRequest` and `routeSystemCallbackRequest` covering external targets, local/unknown cluster routing, nil headers, invalid tokens, namespace not found, and success paths ## Test plan - [x] Unit tests added for both `components/callbacks` and `chasm/lib/callback` packages - [x] All 18 tests pass (9 per package) - [x] `make lint` passes with no new issues
1 parent 88a9b61 commit bae0515

7 files changed

Lines changed: 805 additions & 14 deletions

File tree

chasm/lib/callback/fx.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"go.temporal.io/server/common/cluster"
1010
"go.temporal.io/server/common/collection"
1111
"go.temporal.io/server/common/log"
12+
"go.temporal.io/server/common/namespace"
13+
commonnexus "go.temporal.io/server/common/nexus"
1214
queuescommon "go.temporal.io/server/service/history/queues/common"
1315
"go.uber.org/fx"
1416
)
@@ -23,6 +25,7 @@ func register(
2325
// httpCallerProviderProvider provides an HTTPCallerProvider for CHASM callbacks.
2426
func httpCallerProviderProvider(
2527
clusterMetadata cluster.Metadata,
28+
namespaceRegistry namespace.Registry,
2629
rpcFactory common.RPCFactory,
2730
httpClientCache *cluster.FrontendHTTPClientCache,
2831
logger log.Logger,
@@ -32,12 +35,15 @@ func httpCallerProviderProvider(
3235
return nil, fmt.Errorf("cannot create local frontend HTTP client: %w", err)
3336
}
3437
defaultClient := &http.Client{}
38+
callbackTokenGenerator := commonnexus.NewCallbackTokenGenerator()
3539

3640
m := collection.NewOnceMap(func(queuescommon.NamespaceIDAndDestination) HTTPCaller {
3741
return func(r *http.Request) (*http.Response, error) {
3842
return routeRequest(r,
3943
clusterMetadata,
44+
namespaceRegistry,
4045
httpClientCache,
46+
callbackTokenGenerator,
4147
defaultClient,
4248
localClient,
4349
logger,

chasm/lib/callback/request.go

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,98 @@
11
package callback
22

33
import (
4+
"errors"
45
"net/http"
56

7+
"github.com/nexus-rpc/sdk-go/nexus"
8+
"go.temporal.io/api/serviceerror"
69
"go.temporal.io/server/common"
710
"go.temporal.io/server/common/cluster"
811
"go.temporal.io/server/common/log"
912
"go.temporal.io/server/common/log/tag"
10-
"go.temporal.io/server/common/nexus"
13+
"go.temporal.io/server/common/namespace"
14+
commonnexus "go.temporal.io/server/common/nexus"
1115
)
1216

1317
// Header key used to identify callbacks that originate from and target the same cluster.
1418
// Note: this is the nexusoperations.NexusCallbackSourceHeader stripped of Nexus-Callback-
1519
const callbackSourceHeader = "source"
1620

21+
// routeSystemCallbackRequest routes a system callback request to the appropriate frontend client
22+
// based on the callback token's namespace and active cluster.
23+
func routeSystemCallbackRequest(
24+
r *http.Request,
25+
clusterMetadata cluster.Metadata,
26+
namespaceRegistry namespace.Registry,
27+
httpClientCache *cluster.FrontendHTTPClientCache,
28+
callbackTokenGenerator *commonnexus.CallbackTokenGenerator,
29+
localClient *common.FrontendHTTPClient,
30+
logger log.Logger,
31+
) (*http.Response, error) {
32+
var frontendClient *common.FrontendHTTPClient
33+
if r.Header != nil {
34+
token, err := commonnexus.DecodeCallbackToken(r.Header.Get(commonnexus.CallbackTokenHeader))
35+
if err != nil {
36+
logger.Error("failed to decode callback token", tag.Error(err))
37+
return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid callback token")
38+
}
39+
40+
completion, err := callbackTokenGenerator.DecodeCompletion(token)
41+
if err != nil {
42+
logger.Error("failed to decode completion from token", tag.Error(err))
43+
return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid callback token")
44+
}
45+
ns, err := namespaceRegistry.GetNamespaceByID(namespace.ID(completion.NamespaceId))
46+
if err != nil {
47+
logger.Error("failed to get namespace for nexus completion request", tag.WorkflowNamespaceID(completion.NamespaceId), tag.Error(err))
48+
var nfe *serviceerror.NamespaceNotFound
49+
if errors.As(err, &nfe) {
50+
return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeNotFound, "namespace %q not found", completion.NamespaceId)
51+
}
52+
return nil, commonnexus.ConvertGRPCError(err, false)
53+
}
54+
clusterName := ns.ActiveClusterName(completion.GetWorkflowId())
55+
if clusterMetadata.GetCurrentClusterName() == clusterName {
56+
frontendClient = localClient
57+
} else {
58+
fec, err := httpClientCache.Get(clusterName)
59+
if err != nil {
60+
logger.Warn(
61+
"HTTPCallerProvider unable to get FrontendHTTPClient for callback target cluster. Using local HTTP Client.",
62+
tag.SourceCluster(clusterMetadata.GetCurrentClusterName()),
63+
tag.TargetCluster(clusterName),
64+
tag.Error(err),
65+
)
66+
frontendClient = localClient
67+
} else {
68+
frontendClient = fec
69+
}
70+
}
71+
} else {
72+
frontendClient = localClient
73+
}
74+
r.URL.Path = commonnexus.PathCompletionCallbackNoIdentifier
75+
r.URL.Scheme = frontendClient.Scheme
76+
r.URL.Host = frontendClient.Address
77+
r.Host = frontendClient.Address
78+
return frontendClient.Do(r)
79+
}
80+
1781
func routeRequest(
1882
r *http.Request,
1983
clusterMetadata cluster.Metadata,
84+
namespaceRegistry namespace.Registry,
2085
httpClientCache *cluster.FrontendHTTPClientCache,
86+
callbackTokenGenerator *commonnexus.CallbackTokenGenerator,
2187
defaultClient *http.Client,
2288
localClient *common.FrontendHTTPClient,
2389
logger log.Logger,
2490
) (*http.Response, error) {
91+
if r.URL.String() == commonnexus.SystemCallbackURL {
92+
return routeSystemCallbackRequest(r, clusterMetadata, namespaceRegistry, httpClientCache, callbackTokenGenerator, localClient, logger)
93+
}
2594
// This source header is populated in nexusoperations/executors (via the ClientProvider) for worker targets
26-
// if this header is not populated then we assume it's and external target.
95+
// if this header is not populated then we assume it's an external target.
2796
if r.Header == nil || r.Header.Get(callbackSourceHeader) == "" {
2897
return defaultClient.Do(r)
2998
}
@@ -61,9 +130,6 @@ func routeRequest(
61130
frontendClient = localClient
62131
}
63132

64-
if r.URL.String() == nexus.SystemCallbackURL {
65-
r.URL.Path = nexus.PathCompletionCallbackNoIdentifier
66-
}
67133
r.URL.Scheme = frontendClient.Scheme
68134
r.URL.Host = frontendClient.Address
69135
r.Host = frontendClient.Address

0 commit comments

Comments
 (0)