Skip to content

Add Nexus context propagation sample #394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ These samples demonstrate some common control flow patterns using Temporal's Go
- [**Worker-specific Task Queues**](./worker-specific-task-queues): Use a unique task queue per Worker to have certain Activities only run on that specific Worker. For instance for a file processing Workflow, where one Activity downloads a file and subsequent Activities need to operate on that file. (If multiple Workers were on the same queue, subsequent Activities may get run on different machines that don't have the downloaded file.)

- [**Nexus**](./nexus): Demonstrates how to use the Nexus APIs to facilitate cross namespace calls.
- [**Nexus Cancelation**](./nexus-cancelation): Demonstrates how to cancel a Nexus operation from a caller workflow.
- [**Nexus Context Propagation**](./nexus-context-propagation): Demonstrates how to propgate context through client calls, workflows, and Nexus headers.

### Scenario based examples

Expand Down
12 changes: 6 additions & 6 deletions ctxpropagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ type (
// PropagateKey is the key used to store the value in the Context object
var PropagateKey = contextKey{}

// propagationKey is the key used by the propagator to pass values through the
// HeaderKey is the key used by the propagator to pass values through the
// Temporal server headers
const propagationKey = "custom-header"
const HeaderKey = "custom-header"

// NewContextPropagator returns a context propagator that propagates a set of
// string key-value pairs across a workflow
Expand All @@ -42,7 +42,7 @@ func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) e
if err != nil {
return err
}
writer.Set(propagationKey, payload)
writer.Set(HeaderKey, payload)
return nil
}

Expand All @@ -53,13 +53,13 @@ func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.He
if err != nil {
return err
}
writer.Set(propagationKey, payload)
writer.Set(HeaderKey, payload)
return nil
}

// Extract extracts values from headers and puts them into context
func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) {
if value, ok := reader.Get(propagationKey); ok {
if value, ok := reader.Get(HeaderKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
Expand All @@ -72,7 +72,7 @@ func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader)

// ExtractToWorkflow extracts values from headers and puts them into context
func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) {
if value, ok := reader.Get(propagationKey); ok {
if value, ok := reader.Get(HeaderKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
Expand Down
2 changes: 1 addition & 1 deletion ctxpropagation/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestUnitTestSuite(t *testing.T) {
payload, _ := converter.GetDefaultDataConverter().ToPayload(Values{"some key", "some value"})
s.SetHeader(&commonpb.Header{
Fields: map[string]*commonpb.Payload{
propagationKey: payload,
HeaderKey: payload,
},
})

Expand Down
44 changes: 44 additions & 0 deletions nexus-context-propagation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Nexus Context Propagation

This sample shows how to propgate context through client calls, workflows, and Nexus headers.

From more details on Nexus and how to setup to run this samples please see the [Nexus Sample](../nexus/README.md).

### Running the sample

In separate terminal windows:

### Nexus handler worker

```
cd handler
go run ./worker \
-target-host localhost:7233 \
-namespace my-target-namespace
```

### Nexus caller worker

```
cd caller
go run ./worker \
-target-host localhost:7233 \
-namespace my-caller-namespace
```

### Start caller workflow

```
cd caller
go run ./starter \
-target-host localhost:7233 \
-namespace my-caller-namespace
```

### Output

which should result in:
```
2025/02/27 12:57:40 Started workflow WorkflowID nexus_hello_caller_workflow_20240723195740 RunID c9789128-2fcd-4083-829d-95e43279f6d7
2025/02/27 12:57:40 Workflow result: ¡Hola! Nexus, caller-id: samples-go 👋
```
52 changes: 52 additions & 0 deletions nexus-context-propagation/caller/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"log"
"os"
"time"

"github.com/temporalio/samples-go/ctxpropagation"
"github.com/temporalio/samples-go/nexus/caller" // NOTE: reusing the generic nexus caller workflow
"github.com/temporalio/samples-go/nexus/options"
"github.com/temporalio/samples-go/nexus/service"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

func main() {
clientOptions, err := options.ParseClientOptionFlags(os.Args[1:])
if err != nil {
log.Fatalf("Invalid arguments: %v", err)
}
// Set up context propagators from workflow and non-workflow contexts.
clientOptions.ContextPropagators = []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()}
c, err := client.Dial(clientOptions)
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
ctx := context.Background()
workflowOptions := client.StartWorkflowOptions{
ID: "nexus_hello_caller_workflow_" + time.Now().Format("20060102150405"),
TaskQueue: caller.TaskQueue,
}

ctx = context.WithValue(ctx, ctxpropagation.PropagateKey, ctxpropagation.Values{
Key: "caller-id",
Value: "samples-go",
})
wr, err := c.ExecuteWorkflow(ctx, workflowOptions, caller.HelloCallerWorkflow, "Nexus", service.ES)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", wr.GetID(), "RunID", wr.GetRunID())

// Synchronously wait for the workflow completion.
var result string
err = wr.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)
}
51 changes: 51 additions & 0 deletions nexus-context-propagation/caller/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"log"
"os"

"github.com/temporalio/samples-go/ctxpropagation"
nexuscontextpropagation "github.com/temporalio/samples-go/nexus-context-propagation"
"github.com/temporalio/samples-go/nexus/caller"
"github.com/temporalio/samples-go/nexus/options"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
clientOptions, err := options.ParseClientOptionFlags(os.Args[1:])
if err != nil {
log.Fatalf("Invalid arguments: %v", err)
}
// Set up context propagators from workflow and non-workflow contexts.
clientOptions.ContextPropagators = []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()}
c, err := client.Dial(clientOptions)
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, caller.TaskQueue, worker.Options{
Interceptors: []interceptor.WorkerInterceptor{
&nexuscontextpropagation.WorkerInterceptor{
// Use the provided data converter to encode the Nexus headers. Use a custom data
// converter to encrypt the header values.
// IMPORTANT: Nexus headers values are plain strings and are not visited by the
// grpc-proxy (see related sample), special care should be taken when used to pass
// sensitive information.
DataConverter: converter.GetDefaultDataConverter(),
},
},
})

w.RegisterWorkflow(caller.HelloCallerWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
48 changes: 48 additions & 0 deletions nexus-context-propagation/handler/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package handler

import (
"context"
"fmt"

"github.com/nexus-rpc/sdk-go/nexus"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporalnexus"
"go.temporal.io/sdk/workflow"

"github.com/temporalio/samples-go/ctxpropagation"
"github.com/temporalio/samples-go/nexus/service"
)

// Use the NewWorkflowRunOperation constructor, which is the easiest way to expose a workflow as an operation.
// See alternatives at https://pkg.go.dev/go.temporal.io/sdk/temporalnexus.
var HelloOperation = temporalnexus.NewWorkflowRunOperation(service.HelloOperationName, HelloHandlerWorkflow, func(ctx context.Context, input service.HelloInput, options nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
return client.StartWorkflowOptions{
// Workflow IDs should typically be business meaningful IDs and are used to dedupe workflow starts.
// For this example, we're using the request ID allocated by Temporal when the caller workflow schedules
// the operation, this ID is guaranteed to be stable across retries of this operation.
ID: options.RequestID,
// Task queue defaults to the task queue this operation is handled on.
}, nil
})

func HelloHandlerWorkflow(ctx workflow.Context, input service.HelloInput) (service.HelloOutput, error) {
values, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values)
if ok {
input.Name += ", " + values.Key + ": " + values.Value
}

switch input.Language {
case service.EN:
return service.HelloOutput{Message: "Hello " + input.Name + " 👋"}, nil
case service.FR:
return service.HelloOutput{Message: "Bonjour " + input.Name + " 👋"}, nil
case service.DE:
return service.HelloOutput{Message: "Hallo " + input.Name + " 👋"}, nil
case service.ES:
return service.HelloOutput{Message: "¡Hola! " + input.Name + " 👋"}, nil
case service.TR:
return service.HelloOutput{Message: "Merhaba " + input.Name + " 👋"}, nil
}
return service.HelloOutput{}, fmt.Errorf("unsupported language %q", input.Language)
}
61 changes: 61 additions & 0 deletions nexus-context-propagation/handler/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"log"
"os"

"github.com/nexus-rpc/sdk-go/nexus"
"github.com/temporalio/samples-go/ctxpropagation"
nexuscontextpropagation "github.com/temporalio/samples-go/nexus-context-propagation"
"github.com/temporalio/samples-go/nexus-context-propagation/handler"
"github.com/temporalio/samples-go/nexus/options"
"github.com/temporalio/samples-go/nexus/service"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

const (
taskQueue = "my-handler-task-queue"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
clientOptions, err := options.ParseClientOptionFlags(os.Args[1:])
if err != nil {
log.Fatalf("Invalid arguments: %v", err)
}
clientOptions.ContextPropagators = []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()}
c, err := client.Dial(clientOptions)
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, taskQueue, worker.Options{
Interceptors: []interceptor.WorkerInterceptor{
&nexuscontextpropagation.WorkerInterceptor{
// Use the provided data converter to encode the Nexus headers. Use a custom data
// converter to encrypt the header values.
// IMPORTANT: Nexus headers values are plain strings and are not visited by the
// grpc-proxy (see related sample), special care should be taken when used to pass
// sensitive information.
DataConverter: converter.GetDefaultDataConverter(),
},
},
})
service := nexus.NewService(service.HelloServiceName)
err = service.Register(handler.HelloOperation)
if err != nil {
log.Fatalln("Unable to register operations", err)
}
w.RegisterNexusService(service)
w.RegisterWorkflow(handler.HelloHandlerWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
Loading