Skip to content

Commit d41d870

Browse files
committed
Add multiple namespace support.
1 parent 967f028 commit d41d870

3 files changed

Lines changed: 340 additions & 119 deletions

File tree

README.md

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,53 @@ The table below lists the environment variables available and the relevant Tempo
2222

2323
| Environment Variable | Relevant Client or Worker option | Description |
2424
| --- | --- | --- |
25-
| TEMPORAL_GRPC_ENDPOINT | [ClientOptions.HostPort](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Frontend GRPC endpoint |
26-
| TEMPORAL_TLS_KEY | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Key file |
27-
| TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file |
28-
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file |
29-
| TEMPORAL_NAMESPACE | [ClientOptions.Namespace](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Namespace |
25+
| TEMPORAL_GRPC_ENDPOINT | [ClientOptions.HostPort](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Frontend GRPC endpoint (supports comma-separated values for multiple namespaces) |
26+
| TEMPORAL_TLS_KEY | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Key file (supports comma-separated values for multiple namespaces) |
27+
| TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file (supports comma-separated values for multiple namespaces) |
28+
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file (supports comma-separated values for multiple namespaces) |
29+
| TEMPORAL_NAMESPACE | [ClientOptions.Namespace](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Namespace (supports comma-separated values for multiple namespaces) |
3030
| TEMPORAL_TASK_QUEUE | [TaskQueue](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/worker#New) | The Temporal Task Queue |
3131
| TEMPORAL_WORKFLOW_TASK_POLLERS | [WorkerOptions.MaxConcurrentWorkflowTaskPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#WorkerOptions) | Number of workflow task pollers |
3232
| TEMPORAL_ACTIVITY_TASK_POLLERS | [WorkerOptions.MaxConcurrentActivityTaskPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#WorkerOptions) | Number of activity task pollers |
3333
| PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on |
3434

35+
#### Multi-Namespace Support
36+
37+
The worker supports working with multiple namespaces simultaneously. This allows you to spread load across multiple namespaces with a single worker deployment, providing a more realistic load pattern.
38+
39+
**Configuration Options:**
40+
41+
1. **Same configuration for all namespaces:**
42+
```bash
43+
export TEMPORAL_NAMESPACE="ns1,ns2,ns3"
44+
export TEMPORAL_GRPC_ENDPOINT="temporal.example.com:7233"
45+
export TEMPORAL_TLS_CERT="/path/to/cert.pem"
46+
export TEMPORAL_TLS_KEY="/path/to/key.pem"
47+
```
48+
49+
2. **Different configurations per namespace:**
50+
```bash
51+
export TEMPORAL_NAMESPACE="ns1,ns2,ns3"
52+
export TEMPORAL_GRPC_ENDPOINT="temporal1.example.com:7233,temporal2.example.com:7233,temporal3.example.com:7233"
53+
export TEMPORAL_TLS_CERT="/certs/ns1.pem,/certs/ns2.pem,/certs/ns3.pem"
54+
export TEMPORAL_TLS_KEY="/keys/ns1.key,/keys/ns2.key,/keys/ns3.key"
55+
```
56+
57+
3. **Mixed configuration (some shared, some different):**
58+
```bash
59+
export TEMPORAL_NAMESPACE="ns1,ns2,ns3"
60+
export TEMPORAL_GRPC_ENDPOINT="temporal.example.com:7233" # Same endpoint for all
61+
export TEMPORAL_TLS_CERT="/certs/ns1.pem,/certs/ns2.pem" # ns3 will reuse ns2's cert
62+
export TEMPORAL_TLS_KEY="/keys/shared.key" # Same key for all
63+
```
64+
65+
**How it works:**
66+
- If you provide exactly as many values as namespaces, each namespace uses its corresponding value
67+
- If you provide only one value but multiple namespaces, that single value is reused for all namespaces
68+
- If you provide fewer values than namespaces, the last value is repeated for the remaining namespaces
69+
70+
The worker will create a separate worker instance for each namespace, all running concurrently within the same process.
71+
3572
#### Kubernetes Deployment
3673

3774
There are several ways to deploy the worker in Kubernetes:
@@ -48,11 +85,23 @@ kubectl run benchmark-worker --image ghcr.io/temporalio/benchmark-workers:main \
4885
--env "TEMPORAL_ACTIVITY_TASK_POLLERS=8"
4986
```
5087

51-
2. **Using the example deployment YAML**:
88+
2. **Multi-namespace deployment example**:
89+
90+
```
91+
kubectl run benchmark-worker --image ghcr.io/temporalio/benchmark-workers:main \
92+
--image-pull-policy Always \
93+
--env "TEMPORAL_GRPC_ENDPOINT=temporal-frontend.temporal:7233" \
94+
--env "TEMPORAL_NAMESPACE=namespace1,namespace2,namespace3" \
95+
--env "TEMPORAL_TASK_QUEUE=benchmark" \
96+
--env "TEMPORAL_WORKFLOW_TASK_POLLERS=16" \
97+
--env "TEMPORAL_ACTIVITY_TASK_POLLERS=8"
98+
```
99+
100+
3. **Using the example deployment YAML**:
52101

53102
We provide an [example deployment spec](./deployment.yaml) for you to customize to your requirements. Once you have edited the environment variables in the deployment.yaml you can create the deployment with `kubectl apply -f ./deployment.yaml`.
54103

55-
3. **Using the Helm chart (Recommended)**:
104+
4. **Using the Helm chart (Recommended)**:
56105

57106
We provide a Helm chart that can be installed from the GitHub Container Registry:
58107

@@ -106,10 +155,10 @@ The table below lists the environment variables available and the relevant Tempo
106155

107156
| Environment Variable | Relevant Client or Worker option | Description |
108157
| --- | --- | --- |
109-
| TEMPORAL_GRPC_ENDPOINT | [ClientOptions.HostPort](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Frontend GRPC endpoint |
110-
| TEMPORAL_TLS_KEY | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Key file |
111-
| TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file |
112-
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file |
158+
| TEMPORAL_GRPC_ENDPOINT | [ClientOptions.HostPort](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Frontend GRPC endpoint (supports comma-separated values for multiple namespaces) |
159+
| TEMPORAL_TLS_KEY | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Key file (supports comma-separated values for multiple namespaces) |
160+
| TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file (supports comma-separated values for multiple namespaces) |
161+
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file (supports comma-separated values for multiple namespaces) |
113162
| PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on |
114163

115164
The runner is also configured via command line options:
@@ -119,20 +168,52 @@ Usage: runner [flags] [workflow input] ...
119168
-c int
120169
concurrent workflows (default 10)
121170
-n string
122-
namespace (default "default")
171+
namespace (comma-separated list supported) (default "default")
172+
-s string
173+
signal type
123174
-t string
124175
workflow type
125176
-tq string
126177
task queue (default "benchmark")
127178
-w wait for workflows to complete (default true)
128179
```
129180

181+
#### Multi-Namespace Support in Runner
182+
183+
The runner supports distributing workflow executions across multiple namespaces. This provides a more realistic load pattern by spreading the load across different namespaces.
184+
185+
**Configuration Examples:**
186+
187+
1. **Command line flag for multiple namespaces:**
188+
```bash
189+
runner -n "namespace1,namespace2,namespace3" -t ExecuteActivity '{"Count":1,"Activity":"Sleep","Input":{"SleepTimeInSeconds":3}}'
190+
```
191+
192+
2. **Environment variable for multiple namespaces:**
193+
```bash
194+
export TEMPORAL_NAMESPACE="namespace1,namespace2,namespace3"
195+
runner -t ExecuteActivity '{"Count":1,"Activity":"Sleep","Input":{"SleepTimeInSeconds":3}}'
196+
```
197+
198+
3. **Different GRPC endpoints per namespace:**
199+
```bash
200+
export TEMPORAL_NAMESPACE="ns1,ns2,ns3"
201+
export TEMPORAL_GRPC_ENDPOINT="temporal1.example.com:7233,temporal2.example.com:7233,temporal3.example.com:7233"
202+
runner -t ExecuteActivity '{"Count":1,"Activity":"Sleep","Input":{"SleepTimeInSeconds":3}}'
203+
```
204+
205+
**How it works:**
206+
- The runner creates a separate client for each namespace
207+
- Workflow executions are distributed across namespaces using round-robin rotation
208+
- Each client can have different connection settings (endpoints, TLS certificates, etc.)
209+
130210
To use the runner in a Kubernetes cluster you could use:
131211

132212
```
133213
kubectl run benchmark-runner --image ghcr.io/temporalio/benchmark-workers:main \
134214
--image-pull-policy Always \
135215
--env "TEMPORAL_GRPC_ENDPOINT=temporal-frontend.temporal:7233" \
216+
--env "TEMPORAL_NAMESPACE=namespace1,namespace2,namespace3" \
136217
--command -- runner -t ExecuteActivity '{ "Count": 3, "Activity": "Echo", "Input": { "Message": "test" } }'
137218
```
138219

@@ -146,6 +227,12 @@ The worker provides the following workflows for you to use during benchmarking:
146227

147228
This workflow takes a count, an activity name and an activity input. The activity `Activity` will be run `Count` times with the given `input`. If the activity returns an error the workflow will fail with that error.
148229

230+
### ReceiveSignal
231+
232+
`ReceiveSignal()`
233+
234+
This workflow waits to receive a signal. It can be used with the runner's signal functionality to test signal-based workflows.
235+
149236
### DSLWorkflow
150237

151238
`DSLWorkflow([]DSLStep)`

cmd/runner/main.go

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"fmt"
1010
"log"
1111
"os"
12+
"strings"
13+
"sync/atomic"
1214
"time"
1315

1416
"github.com/alitto/pond"
@@ -24,9 +26,33 @@ var nWorfklows = flag.Int("c", 10, "concurrent workflows")
2426
var sWorkflow = flag.String("t", "", "workflow type")
2527
var sSignalType = flag.String("s", "", "signal type")
2628
var bWait = flag.Bool("w", true, "wait for workflows to complete")
27-
var sNamespace = flag.String("n", "default", "namespace")
29+
var sNamespace = flag.String("n", "default", "namespace (comma-separated list supported)")
2830
var sTaskQueue = flag.String("tq", "benchmark", "task queue")
2931

32+
// parseCommaSeparatedEnv parses a comma-separated environment variable and returns a slice
33+
// If there's only one value but multiple namespaces are needed, it reuses that value
34+
func parseCommaSeparatedEnv(envVar string, numNamespaces int) []string {
35+
value := os.Getenv(envVar)
36+
if value == "" {
37+
return make([]string, numNamespaces)
38+
}
39+
40+
values := strings.Split(value, ",")
41+
for i, v := range values {
42+
values[i] = strings.TrimSpace(v)
43+
}
44+
45+
// If we have fewer values than namespaces, repeat the last value
46+
if len(values) < numNamespaces {
47+
lastValue := values[len(values)-1]
48+
for len(values) < numNamespaces {
49+
values = append(values, lastValue)
50+
}
51+
}
52+
53+
return values
54+
}
55+
3056
func main() {
3157
flag.Usage = func() {
3258
fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [flags] [workflow input] ...\n", os.Args[0])
@@ -39,58 +65,90 @@ func main() {
3965
log.Printf("WARNING: failed to set GOMAXPROCS: %v.\n", err)
4066
}
4167

42-
namespace := *sNamespace
68+
namespaces := *sNamespace
4369
envNamespace := os.Getenv("TEMPORAL_NAMESPACE")
4470
if envNamespace != "" && envNamespace != "default" {
45-
namespace = envNamespace
71+
namespaces = envNamespace
4672
}
4773

48-
clientOptions := client.Options{
49-
HostPort: os.Getenv("TEMPORAL_GRPC_ENDPOINT"),
50-
Namespace: namespace,
51-
Logger: NewNopLogger(),
74+
// Parse comma-separated namespaces
75+
namespaceList := strings.Split(namespaces, ",")
76+
for i, ns := range namespaceList {
77+
namespaceList[i] = strings.TrimSpace(ns)
5278
}
5379

54-
tlsKeyPath := os.Getenv("TEMPORAL_TLS_KEY")
55-
tlsCertPath := os.Getenv("TEMPORAL_TLS_CERT")
56-
tlsCaPath := os.Getenv("TEMPORAL_TLS_CA")
80+
log.Printf("Using namespaces: %v", namespaceList)
81+
82+
// Parse comma-separated configuration values
83+
grpcEndpoints := parseCommaSeparatedEnv("TEMPORAL_GRPC_ENDPOINT", len(namespaceList))
84+
tlsKeyPaths := parseCommaSeparatedEnv("TEMPORAL_TLS_KEY", len(namespaceList))
85+
tlsCertPaths := parseCommaSeparatedEnv("TEMPORAL_TLS_CERT", len(namespaceList))
86+
tlsCaPaths := parseCommaSeparatedEnv("TEMPORAL_TLS_CA", len(namespaceList))
87+
88+
// Create clients for each namespace
89+
clients := make([]client.Client, len(namespaceList))
90+
for i, namespace := range namespaceList {
91+
clientOptions := client.Options{
92+
HostPort: grpcEndpoints[i],
93+
Namespace: namespace,
94+
Logger: NewNopLogger(),
95+
}
5796

58-
if tlsKeyPath != "" && tlsCertPath != "" {
59-
tlsConfig := tls.Config{}
97+
tlsKeyPath := tlsKeyPaths[i]
98+
tlsCertPath := tlsCertPaths[i]
99+
tlsCaPath := tlsCaPaths[i]
60100

61-
cert, err := tls.LoadX509KeyPair(tlsCertPath, tlsKeyPath)
62-
if err != nil {
63-
log.Fatalln("Unable to create key pair for TLS", err)
64-
}
101+
if tlsKeyPath != "" && tlsCertPath != "" {
102+
tlsConfig := tls.Config{}
65103

66-
var tlsCaPool *x509.CertPool
67-
if tlsCaPath != "" {
68-
tlsCaPool = x509.NewCertPool()
69-
b, err := os.ReadFile(tlsCaPath)
104+
cert, err := tls.LoadX509KeyPair(tlsCertPath, tlsKeyPath)
70105
if err != nil {
71-
log.Fatalln("Failed reading server CA: %w", err)
72-
} else if !tlsCaPool.AppendCertsFromPEM(b) {
73-
log.Fatalln("Server CA PEM file invalid")
106+
log.Fatalf("Unable to create key pair for TLS for namespace %s: %v", namespace, err)
107+
}
108+
109+
var tlsCaPool *x509.CertPool
110+
if tlsCaPath != "" {
111+
tlsCaPool = x509.NewCertPool()
112+
b, err := os.ReadFile(tlsCaPath)
113+
if err != nil {
114+
log.Fatalf("Failed reading server CA for namespace %s: %v", namespace, err)
115+
} else if !tlsCaPool.AppendCertsFromPEM(b) {
116+
log.Fatalf("Server CA PEM file invalid for namespace %s", namespace)
117+
}
74118
}
75-
}
76119

77-
tlsConfig.Certificates = []tls.Certificate{cert}
78-
tlsConfig.RootCAs = tlsCaPool
120+
tlsConfig.Certificates = []tls.Certificate{cert}
121+
tlsConfig.RootCAs = tlsCaPool
122+
123+
if os.Getenv("TEMPORAL_TLS_DISABLE_HOST_VERIFICATION") != "" {
124+
tlsConfig.InsecureSkipVerify = true
125+
}
79126

80-
if os.Getenv("TEMPORAL_TLS_DISABLE_HOST_VERIFICATION") != "" {
81-
tlsConfig.InsecureSkipVerify = true
127+
clientOptions.ConnectionOptions.TLS = &tlsConfig
82128
}
83129

84-
clientOptions.ConnectionOptions.TLS = &tlsConfig
85-
}
130+
if os.Getenv("PROMETHEUS_ENDPOINT") != "" {
131+
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{
132+
ListenAddress: os.Getenv("PROMETHEUS_ENDPOINT"),
133+
TimerType: "histogram",
134+
}))
135+
}
86136

87-
if os.Getenv("PROMETHEUS_ENDPOINT") != "" {
88-
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{
89-
ListenAddress: os.Getenv("PROMETHEUS_ENDPOINT"),
90-
TimerType: "histogram",
91-
}))
137+
c, err := client.Dial(clientOptions)
138+
if err != nil {
139+
log.Fatalf("Unable to create client for namespace %s (endpoint: %s): %v", namespace, grpcEndpoints[i], err)
140+
}
141+
clients[i] = c
142+
log.Printf("Created client for namespace: %s (endpoint: %s)", namespace, grpcEndpoints[i])
92143
}
93144

145+
// Ensure all clients are closed on exit
146+
defer func() {
147+
for _, c := range clients {
148+
c.Close()
149+
}
150+
}()
151+
94152
var input []interface{}
95153
for _, a := range flag.Args() {
96154
var i interface{}
@@ -101,18 +159,19 @@ func main() {
101159
input = append(input, i)
102160
}
103161

104-
c, err := client.Dial(clientOptions)
105-
if err != nil {
106-
log.Fatalln("Unable to create client", err)
107-
}
108-
defer c.Close()
109-
110162
pool := pond.New(*nWorfklows, 0)
111163

164+
// Counter for rotating among clients
165+
var clientCounter uint64
166+
112167
var starter func() (client.WorkflowRun, error)
113168

114169
if *sSignalType != "" {
115170
starter = func() (client.WorkflowRun, error) {
171+
// Rotate among clients
172+
clientIndex := atomic.AddUint64(&clientCounter, 1) % uint64(len(clients))
173+
c := clients[clientIndex]
174+
116175
wID := uuid.New()
117176
return c.SignalWithStartWorkflow(
118177
context.Background(),
@@ -129,6 +188,10 @@ func main() {
129188
}
130189
} else {
131190
starter = func() (client.WorkflowRun, error) {
191+
// Rotate among clients
192+
clientIndex := atomic.AddUint64(&clientCounter, 1) % uint64(len(clients))
193+
c := clients[clientIndex]
194+
132195
return c.ExecuteWorkflow(
133196
context.Background(),
134197
client.StartWorkflowOptions{

0 commit comments

Comments
 (0)