Skip to content

Add dynamic workflow sample #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ Workflow Definition and an Activity Definition using API Key to authenticate wit

These samples demonstrate some common control flow patterns using Temporal's Go SDK API.

- [**Dynamic Workflows**](./dynamic-workflows): Demonstrates how to execute Workflows and Activities dynamically,
using a single "Dynamic Workflow"

- [**Dynamic Execution**](./dynamic): Demonstrates how to execute
Workflows and Activities using a name rather than a strongly typed function.

Expand Down
15 changes: 15 additions & 0 deletions dynamic-workflows/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Dynamic Workflows and Activities

The purpose of this sample is to demonstrate registering and using dynamic workflows and activities to
handle various workflow types at runtime.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run dynamic-workflows/worker/main.go
```
3) Run the following command to start the example
```
go run dynamic-workflows/starter/main.go
```
20 changes: 20 additions & 0 deletions dynamic-workflows/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dynamic_workflows

import (
"context"
"fmt"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/converter"
)

func DynamicActivity(ctx context.Context, args converter.EncodedValues) (string, error) {
var arg1, arg2 string
err := args.Get(&arg1, &arg2)
if err != nil {
return "", fmt.Errorf("failed to decode arguments: %w", err)
}

info := activity.GetInfo(ctx)
result := fmt.Sprintf("%s - %s - %s", info.WorkflowType.Name, arg1, arg2)
return result, nil
}
64 changes: 64 additions & 0 deletions dynamic-workflows/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"log"

"github.com/pborman/uuid"
"go.temporal.io/sdk/client"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// This workflow ID can be user business logic identifier as well.
workflowID := "dynamic_workflows_" + uuid.New()
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: "dynamic-workflows",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, "DurableExecution", "Hello", "Temporal")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)

we, err = c.ExecuteWorkflow(context.Background(), workflowOptions, "dynamic-activity", "Peanut", "Butter")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)

we, err = c.ExecuteWorkflow(context.Background(), workflowOptions, "dynamic-activity123", "Jelly", "Time")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)
}
33 changes: 33 additions & 0 deletions dynamic-workflows/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

dynamic "github.com/temporalio/samples-go/dynamic-workflows"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "dynamic-workflows", worker.Options{})

w.RegisterDynamicWorkflow(dynamic.DynamicWorkflow, workflow.DynamicRegisterOptions{})

Check failure on line 26 in dynamic-workflows/worker/main.go

View workflow job for this annotation

GitHub Actions / build-and-test

w.RegisterDynamicWorkflow undefined (type worker.Worker has no field or method RegisterDynamicWorkflow)

Check failure on line 26 in dynamic-workflows/worker/main.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: workflow.DynamicRegisterOptions
w.RegisterDynamicActivity(dynamic.DynamicActivity, activity.DynamicRegisterOptions{})

Check failure on line 27 in dynamic-workflows/worker/main.go

View workflow job for this annotation

GitHub Actions / build-and-test

w.RegisterDynamicActivity undefined (type worker.Worker has no field or method RegisterDynamicActivity)

Check failure on line 27 in dynamic-workflows/worker/main.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: activity.DynamicRegisterOptions (compile)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
31 changes: 31 additions & 0 deletions dynamic-workflows/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dynamic_workflows

import (
"fmt"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
"strings"
"time"
)

func DynamicWorkflow(ctx workflow.Context, args converter.EncodedValues) (string, error) {
var result string
info := workflow.GetInfo(ctx)

var arg1, arg2 string
err := args.Get(&arg1, &arg2)
if err != nil {
return "", fmt.Errorf("failed to decode arguments: %w", err)
}

if strings.HasPrefix(info.WorkflowType.Name, "dynamic-activity") {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second})
err := workflow.ExecuteActivity(ctx, "random-activity-name", arg1, arg2).Get(ctx, &result)
if err != nil {
return "", err
}
} else {
result = fmt.Sprintf("%s - %s - %s", info.WorkflowType.Name, arg1, arg2)
}
return result, nil
}
23 changes: 23 additions & 0 deletions dynamic-workflows/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dynamic_workflows

import (
"github.com/stretchr/testify/assert"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/workflow"
"testing"
)

func TestDynamicWorkflow(t *testing.T) {
s := testsuite.WorkflowTestSuite{}
env := s.NewTestWorkflowEnvironment()
env.RegisterDynamicWorkflow(DynamicWorkflow, workflow.DynamicRegisterOptions{})

Check failure on line 14 in dynamic-workflows/workflow_test.go

View workflow job for this annotation

GitHub Actions / build-and-test

env.RegisterDynamicWorkflow undefined (type *"go.temporal.io/sdk/internal".TestWorkflowEnvironment has no field or method RegisterDynamicWorkflow)

Check failure on line 14 in dynamic-workflows/workflow_test.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: workflow.DynamicRegisterOptions
env.RegisterDynamicActivity(DynamicActivity, activity.DynamicRegisterOptions{})

Check failure on line 15 in dynamic-workflows/workflow_test.go

View workflow job for this annotation

GitHub Actions / build-and-test

env.RegisterDynamicActivity undefined (type *"go.temporal.io/sdk/internal".TestWorkflowEnvironment has no field or method RegisterDynamicActivity)

Check failure on line 15 in dynamic-workflows/workflow_test.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: activity.DynamicRegisterOptions (compile)
env.ExecuteWorkflow("dynamic-activity", "Hello", "World")
assert.True(t, env.IsWorkflowCompleted())
assert.NoError(t, env.GetWorkflowError())
var result string
err := env.GetWorkflowResult(&result)
assert.NoError(t, err)
assert.Equal(t, "dynamic-activity - Hello - World", result)
}
Loading