Skip to content

Commit 20fb339

Browse files
committed
Switch to poller autoscaling.
Allow setting the maximum.
1 parent ea79bf0 commit 20fb339

4 files changed

Lines changed: 62 additions & 29 deletions

File tree

README.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ The table below lists the environment variables available and the relevant Tempo
2828
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file |
2929
| TEMPORAL_NAMESPACE | [ClientOptions.Namespace](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ClientOptions) | The Temporal Namespace |
3030
| TEMPORAL_TASK_QUEUE | [TaskQueue](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/worker#New) | The Temporal Task Queue |
31-
| TEMPORAL_WORKFLOW_TASK_POLLERS | [WorkerOptions.MaxConcurrentWorkflowTaskPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#WorkerOptions) | Number of workflow task pollers |
32-
| TEMPORAL_ACTIVITY_TASK_POLLERS | [WorkerOptions.MaxConcurrentActivityTaskPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#WorkerOptions) | Number of activity task pollers |
31+
| TEMPORAL_MAX_WORKFLOW_TASK_POLLERS | [PollerBehaviorAutoscalingOptions.MaximumNumberOfPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.37.0/internal#PollerBehaviorAutoscalingOptions) | Maximum number of workflow task pollers |
32+
| TEMPORAL_MAX_ACTIVITY_TASK_POLLERS | [PollerBehaviorAutoscalingOptions.MaximumNumberOfPollers](https://pkg.go.dev/go.temporal.io/sdk@v1.37.0/internal#PollerBehaviorAutoscalingOptions) | Maximum number of activity task pollers |
3333
| PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on |
3434

3535
#### Kubernetes Deployment
@@ -43,9 +43,7 @@ kubectl run benchmark-worker --image ghcr.io/temporalio/benchmark-workers:main \
4343
--image-pull-policy Always \
4444
--env "TEMPORAL_GRPC_ENDPOINT=temporal-frontend.temporal:7233" \
4545
--env "TEMPORAL_NAMESPACE=default" \
46-
--env "TEMPORAL_TASK_QUEUE=benchmark" \
47-
--env "TEMPORAL_WORKFLOW_TASK_POLLERS=16" \
48-
--env "TEMPORAL_ACTIVITY_TASK_POLLERS=8"
46+
--env "TEMPORAL_TASK_QUEUE=benchmark"
4947
```
5048

5149
2. **Using the example deployment YAML**:

cmd/worker/main.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323

2424
var sNamespace = flag.String("n", "default", "namespace")
2525
var sTaskQueue = flag.String("tq", "benchmark", "task queue")
26-
var nWorkflowPollers = flag.Int("wp", -1, "max concurrent workflow task pollers (-1 = use default, 0 = disable)")
27-
var nActivityPollers = flag.Int("ap", -1, "max concurrent activity task pollers (-1 = use default, 0 = disable)")
26+
var nMaxWorkflowPollers = flag.Int("wp", -1, "max concurrent workflow task pollers (-1 = use default, 0 = disable)")
27+
var nMaxActivityPollers = flag.Int("ap", -1, "max concurrent activity task pollers (-1 = use default, 0 = disable)")
2828

2929
// Track which flags were explicitly set
3030
var flagsSet = make(map[string]bool)
@@ -58,8 +58,8 @@ func main() {
5858
fmt.Fprintf(flag.CommandLine.Output(), "\nEnvironment variables (used if flag not set):\n")
5959
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_NAMESPACE\n")
6060
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_TASK_QUEUE\n")
61-
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_WORKFLOW_TASK_POLLERS\n")
62-
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_ACTIVITY_TASK_POLLERS\n")
61+
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_MAX_WORKFLOW_TASK_POLLERS\n")
62+
fmt.Fprintf(flag.CommandLine.Output(), " TEMPORAL_MAX_ACTIVITY_TASK_POLLERS\n")
6363
}
6464

6565
flag.Parse()
@@ -76,8 +76,8 @@ func main() {
7676
// Apply precedence: command line > environment variable > default
7777
namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default")
7878
taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark")
79-
workflowPollers := getIntValue("wp", "TEMPORAL_WORKFLOW_TASK_POLLERS", *nWorkflowPollers, -1)
80-
activityPollers := getIntValue("ap", "TEMPORAL_ACTIVITY_TASK_POLLERS", *nActivityPollers, -1)
79+
maxWorkflowPollers := getIntValue("wp", "TEMPORAL_MAX_WORKFLOW_TASK_POLLERS", *nMaxWorkflowPollers, -1)
80+
maxActivityPollers := getIntValue("ap", "TEMPORAL_MAX_ACTIVITY_TASK_POLLERS", *nMaxActivityPollers, -1)
8181

8282
log.Printf("Creating worker for namespace: %s", namespace)
8383

@@ -134,12 +134,20 @@ func main() {
134134

135135
workerOptions := worker.Options{}
136136

137-
if workflowPollers >= 0 {
138-
workerOptions.MaxConcurrentWorkflowTaskPollers = workflowPollers
137+
if maxWorkflowPollers >= 0 {
138+
workerOptions.WorkflowTaskPollerBehavior = worker.NewPollerBehaviorAutoscaling(worker.PollerBehaviorAutoscalingOptions{
139+
MaximumNumberOfPollers: maxWorkflowPollers,
140+
})
141+
} else {
142+
workerOptions.WorkflowTaskPollerBehavior = worker.NewPollerBehaviorSimpleMaximum(worker.PollerBehaviorSimpleMaximumOptions{})
139143
}
140144

141-
if activityPollers >= 0 {
142-
workerOptions.MaxConcurrentActivityTaskPollers = activityPollers
145+
if maxActivityPollers >= 0 {
146+
workerOptions.ActivityTaskPollerBehavior = worker.NewPollerBehaviorAutoscaling(worker.PollerBehaviorAutoscalingOptions{
147+
MaximumNumberOfPollers: maxActivityPollers,
148+
})
149+
} else {
150+
workerOptions.ActivityTaskPollerBehavior = worker.NewPollerBehaviorSimpleMaximum(worker.PollerBehaviorSimpleMaximumOptions{})
143151
}
144152

145153
// TODO: Support more worker options

go.mod

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
11
module github.com/temporalio/benchmark-workers
22

3-
go 1.19
3+
go 1.23.0
44

55
require (
66
github.com/alitto/pond v1.8.3
77
github.com/pborman/uuid v1.2.1
88
github.com/prometheus/client_golang v1.14.0
99
github.com/uber-go/tally/v4 v4.1.3
10-
go.temporal.io/sdk v1.25.1
10+
go.temporal.io/sdk v1.37.0
1111
go.temporal.io/sdk/contrib/tally v0.2.0
1212
go.uber.org/automaxprocs v1.5.2
1313
)
1414

1515
require (
1616
github.com/beorn7/perks v1.0.1 // indirect
17-
github.com/cespare/xxhash/v2 v2.2.0 // indirect
17+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1818
github.com/davecgh/go-spew v1.1.1 // indirect
1919
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
2020
github.com/gogo/googleapis v1.4.1 // indirect
2121
github.com/gogo/protobuf v1.3.2 // indirect
2222
github.com/gogo/status v1.1.1 // indirect
2323
github.com/golang/mock v1.6.0 // indirect
2424
github.com/golang/protobuf v1.5.3 // indirect
25-
github.com/google/uuid v1.3.0 // indirect
25+
github.com/google/uuid v1.6.0 // indirect
2626
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
2727
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
2828
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@@ -32,19 +32,19 @@ require (
3232
github.com/prometheus/common v0.37.0 // indirect
3333
github.com/prometheus/procfs v0.8.0 // indirect
3434
github.com/robfig/cron v1.2.0 // indirect
35-
github.com/stretchr/objx v0.5.0 // indirect
36-
github.com/stretchr/testify v1.8.4 // indirect
35+
github.com/stretchr/objx v0.5.2 // indirect
36+
github.com/stretchr/testify v1.10.0 // indirect
3737
github.com/twmb/murmur3 v1.1.6 // indirect
38-
go.temporal.io/api v1.24.0 // indirect
38+
go.temporal.io/api v1.53.0 // indirect
3939
go.uber.org/atomic v1.10.0 // indirect
40-
golang.org/x/net v0.23.0 // indirect
41-
golang.org/x/sys v0.18.0 // indirect
42-
golang.org/x/text v0.14.0 // indirect
40+
golang.org/x/net v0.39.0 // indirect
41+
golang.org/x/sys v0.32.0 // indirect
42+
golang.org/x/text v0.24.0 // indirect
4343
golang.org/x/time v0.3.0 // indirect
4444
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 // indirect
45-
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 // indirect
46-
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
47-
google.golang.org/grpc v1.57.0 // indirect
48-
google.golang.org/protobuf v1.31.0 // indirect
45+
google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed // indirect
46+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
47+
google.golang.org/grpc v1.67.1 // indirect
48+
google.golang.org/protobuf v1.36.6 // indirect
4949
gopkg.in/yaml.v3 v3.0.1 // indirect
5050
)

0 commit comments

Comments
 (0)