Skip to content

Commit 73130b5

Browse files
anuj-atlanclaude
andcommitted
feat: add used worker slots to Temporal scaler composite metric
Composite metric is now: backlog + runningWorkflowCount + usedWorkerSlots This prevents premature scale-down when workers are actively executing tasks but the task queue backlog is empty. A single workflow can spawn many concurrent activities; runningWorkflowCount alone undercounts load. How it works: - Lists worker pods in the ScaledObject's namespace via Kubernetes API (app.kubernetes.io/component=worker label selector) - Scrapes each pod's /metrics endpoint (port 9464, configurable via workerMetricsPort) and parses temporal_worker_task_slots_used - Filters by worker type derived from queueTypes config: activity → ActivityWorker, workflow → WorkflowWorker, empty → all - Sums used slots across all pods and adds to the metric Failure handling: - Single pod scrape fails: skip pod, use partial sum from remaining pods - All pod scrapes fail, cache < 180s old: return last known good value - All pod scrapes fail, cache expired: return 0 - Scrape loop exceeds 12s budget: stop, use partial results so far - SDK < 2.5.0 (no metrics endpoint): scrapes fail gracefully; runningWorkflowCount still protects against premature scale-down Observability: - keda_temporal_scaler_worker_slots_scrape_errors_total{namespace, task_queue, reason} counter with three reason values: pod_scrape_error, all_pods_failed_cache_hit, all_pods_failed_cache_expired Changes: - pkg/scalers/temporal_scaler.go: pod discovery, metrics scraping with timeout budget and cache, queueTypes-based worker type filtering, scrape error counter registered on controller-runtime metrics registry - pkg/scaling/scalers_builder.go: pass Kubernetes client to constructor - pkg/scalers/temporal_scaler_test.go: slot parsing, worker type mapping, cache fallback, scrape timeout (end-to-end with fake kube client) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 0c957fa commit 73130b5

4 files changed

Lines changed: 517 additions & 16 deletions

File tree

.github/workflows/custom-build-image.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ on:
44
push:
55
branches:
66
- 2.18.3-main
7+
- feature/temporal-scaler-worker-slots-metric
78

89
jobs:
910
build:

pkg/scalers/temporal_scaler.go

Lines changed: 250 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,89 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7+
"io"
78
"log/slog"
9+
"net"
10+
"net/http"
11+
"strconv"
812
"strings"
13+
"sync"
914
"time"
1015

1116
"github.com/go-logr/logr"
17+
"github.com/prometheus/client_golang/prometheus"
18+
dto "github.com/prometheus/client_model/go"
19+
"github.com/prometheus/common/expfmt"
1220
workflowservice "go.temporal.io/api/workflowservice/v1"
1321
sdk "go.temporal.io/sdk/client"
1422
sdklog "go.temporal.io/sdk/log"
1523
"google.golang.org/grpc"
1624
"google.golang.org/grpc/metadata"
1725
v2 "k8s.io/api/autoscaling/v2"
26+
corev1 "k8s.io/api/core/v1"
1827
"k8s.io/metrics/pkg/apis/external_metrics"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
1930

2031
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
2132
kedautil "github.com/kedacore/keda/v2/pkg/util"
2233
)
2334

35+
const (
36+
// scrapeLoopTimeout is the total time budget for scraping all worker pods.
37+
scrapeLoopTimeout = 12 * time.Second
38+
// slotsCacheTTL is how long a cached slots value remains valid after a
39+
// successful scrape before falling back to 0 on persistent failure.
40+
slotsCacheTTL = 180 * time.Second
41+
// maxMetricsResponseBytes limits the size of a single pod's /metrics response
42+
// to prevent OOM from misconfigured or malicious pods.
43+
maxMetricsResponseBytes = 10 * 1024 * 1024 // 10 MB
44+
)
45+
2446
var (
2547
temporalDefauleQueueTypes = []sdk.TaskQueueType{
2648
sdk.TaskQueueTypeActivity,
2749
sdk.TaskQueueTypeWorkflow,
2850
sdk.TaskQueueTypeNexus,
2951
}
52+
53+
// temporalSlotsScrapeErrors counts worker slot scrape failures by reason:
54+
// pod_scrape_error – a single pod's /metrics request failed
55+
// scrape_loop_timeout – 12s budget exceeded, used partial results
56+
// all_pods_failed_cache_hit – all pods failed; returned last cached value
57+
// all_pods_failed_cache_expired – all pods failed and cache expired; returned 0
58+
temporalSlotsScrapeErrors = prometheus.NewCounterVec(
59+
prometheus.CounterOpts{
60+
Namespace: "keda",
61+
Subsystem: "temporal_scaler",
62+
Name: "worker_slots_scrape_errors_total",
63+
Help: "Total number of temporal worker slot scrape failures. " +
64+
"Use reason label to distinguish pod-level errors from full-scrape failures.",
65+
},
66+
[]string{"namespace", "task_queue", "reason"},
67+
)
3068
)
3169

70+
func init() {
71+
ctrlmetrics.Registry.MustRegister(temporalSlotsScrapeErrors)
72+
}
73+
74+
// slotsCache holds the last successful slots scrape result.
75+
type slotsCache struct {
76+
value int64
77+
timestamp time.Time
78+
}
79+
3280
type temporalScaler struct {
33-
metricType v2.MetricTargetType
34-
metadata *temporalMetadata
35-
tcl sdk.Client
36-
logger logr.Logger
81+
metricType v2.MetricTargetType
82+
metadata *temporalMetadata
83+
tcl sdk.Client
84+
kubeClient client.Client
85+
httpClient *http.Client
86+
logger logr.Logger
87+
podNamespace string
88+
slotsMu sync.Mutex
89+
lastSlots slotsCache
3790
}
3891

3992
type temporalMetadata struct {
@@ -48,6 +101,7 @@ type temporalMetadata struct {
48101
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"`
49102
IncludeRunningWorkflowCount bool `keda:"name=includeRunningWorkflowCount, order=triggerMetadata, default=true"`
50103
WorkflowTaskQueueForCount string `keda:"name=workflowTaskQueueForCount, order=triggerMetadata;resolvedEnv, optional"`
104+
WorkerMetricsPort int `keda:"name=workerMetricsPort, order=triggerMetadata, default=9464"`
51105
APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv, optional"`
52106
MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"`
53107

@@ -77,10 +131,14 @@ func (a *temporalMetadata) Validate() error {
77131
return fmt.Errorf("minConnectTimeout must be a positive number")
78132
}
79133

134+
if a.WorkerMetricsPort < 1 || a.WorkerMetricsPort > 65535 {
135+
return fmt.Errorf("workerMetricsPort must be between 1 and 65535")
136+
}
137+
80138
return nil
81139
}
82140

83-
func NewTemporalScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
141+
func NewTemporalScaler(ctx context.Context, kubeClient client.Client, config *scalersconfig.ScalerConfig) (Scaler, error) {
84142
logger := InitializeLogger(config, "temporal_scaler")
85143

86144
metricType, err := GetMetricTargetType(config)
@@ -99,10 +157,13 @@ func NewTemporalScaler(ctx context.Context, config *scalersconfig.ScalerConfig)
99157
}
100158

101159
return &temporalScaler{
102-
metricType: metricType,
103-
metadata: meta,
104-
tcl: c,
105-
logger: logger,
160+
metricType: metricType,
161+
metadata: meta,
162+
tcl: c,
163+
kubeClient: kubeClient,
164+
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false),
165+
logger: logger,
166+
podNamespace: config.ScalableObjectNamespace,
106167
}, nil
107168
}
108169

@@ -164,18 +225,25 @@ func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) {
164225
}
165226

166227
backlog := getCombinedBacklogCount(resp)
228+
metric := backlog
167229

168-
if !s.metadata.IncludeRunningWorkflowCount {
169-
return backlog, nil
230+
if s.metadata.IncludeRunningWorkflowCount {
231+
runningCount, err := s.getRunningWorkflowCount(ctx)
232+
if err != nil {
233+
s.logger.V(1).Info("failed to get running workflow count, using backlog only", "error", err)
234+
} else {
235+
metric += runningCount
236+
}
170237
}
171238

172-
runningCount, err := s.getRunningWorkflowCount(ctx)
239+
usedSlots, err := s.getUsedWorkerSlots(ctx)
173240
if err != nil {
174-
s.logger.V(1).Info("failed to get running workflow count, using backlog only", "error", err)
175-
return backlog, nil
241+
s.logger.Info("failed to get worker slots metric, excluding from metric", "error", err)
242+
} else {
243+
metric += usedSlots
176244
}
177245

178-
return backlog + runningCount, nil
246+
return metric, nil
179247
}
180248

181249
// getRunningWorkflowCount returns the approximate number of running workflow executions
@@ -201,6 +269,173 @@ func (s *temporalScaler) getRunningWorkflowCount(ctx context.Context) (int64, er
201269
return resp.GetCount(), nil
202270
}
203271

272+
// getUsedWorkerSlots discovers worker pods in the ScaledObject's namespace and
273+
// scrapes their Prometheus metrics endpoint to sum temporal_worker_task_slots_used
274+
// for worker types matching the configured queueTypes. This prevents premature
275+
// scale-down when workers are actively executing tasks but the task queue backlog
276+
// is empty.
277+
//
278+
// On transient failures (all pod scrapes fail), it returns the last known good
279+
// value if within the cache TTL. A total timeout budget bounds the scrape loop
280+
// so that slow/unreachable pods don't block the KEDA polling cycle.
281+
func (s *temporalScaler) getUsedWorkerSlots(ctx context.Context) (int64, error) {
282+
if s.kubeClient == nil || s.httpClient == nil {
283+
return 0, fmt.Errorf("kubernetes client or http client not configured")
284+
}
285+
286+
podList := &corev1.PodList{}
287+
labelSelector := client.MatchingLabels{"app.kubernetes.io/component": "worker"}
288+
if err := s.kubeClient.List(ctx, podList, client.InNamespace(s.podNamespace), labelSelector); err != nil {
289+
return 0, fmt.Errorf("failed to list worker pods in namespace %s: %w", s.podNamespace, err)
290+
}
291+
292+
if len(podList.Items) == 0 {
293+
return 0, nil
294+
}
295+
296+
// Apply a timeout budget for the entire scrape loop.
297+
scrapeCtx, cancel := context.WithTimeout(ctx, scrapeLoopTimeout)
298+
defer cancel()
299+
300+
var totalUsedSlots int64
301+
var scrapedCount int
302+
for i := range podList.Items {
303+
pod := &podList.Items[i]
304+
if pod.Status.Phase != corev1.PodRunning || pod.Status.PodIP == "" || !isPodReady(pod) {
305+
continue
306+
}
307+
308+
// Stop scraping if we've exceeded the timeout budget.
309+
if scrapeCtx.Err() != nil {
310+
s.logger.Info("scrape loop timeout reached, using partial results",
311+
"scraped", scrapedCount, "remaining", len(podList.Items)-i)
312+
temporalSlotsScrapeErrors.WithLabelValues(s.podNamespace, s.metadata.TaskQueue, "scrape_loop_timeout").Inc()
313+
break
314+
}
315+
316+
slots, err := s.scrapeWorkerSlots(scrapeCtx, pod.Status.PodIP)
317+
if err != nil {
318+
s.logger.Info("failed to scrape worker pod metrics, skipping",
319+
"pod", pod.Name, "ip", pod.Status.PodIP, "error", err)
320+
temporalSlotsScrapeErrors.WithLabelValues(s.podNamespace, s.metadata.TaskQueue, "pod_scrape_error").Inc()
321+
continue
322+
}
323+
totalUsedSlots += slots
324+
scrapedCount++
325+
}
326+
327+
s.logger.V(1).Info("worker slots metric",
328+
"namespace", s.podNamespace, "totalUsedSlots", totalUsedSlots,
329+
"podCount", len(podList.Items), "scrapedCount", scrapedCount)
330+
331+
// If no pods could be scraped, fall back to cached value within TTL.
332+
if scrapedCount == 0 {
333+
s.slotsMu.Lock()
334+
cached := s.lastSlots
335+
s.slotsMu.Unlock()
336+
if time.Since(cached.timestamp) <= slotsCacheTTL {
337+
s.logger.Info("all scrapes failed, using cached slots value",
338+
"cachedValue", cached.value, "cacheAge", time.Since(cached.timestamp).String())
339+
temporalSlotsScrapeErrors.WithLabelValues(s.podNamespace, s.metadata.TaskQueue, "all_pods_failed_cache_hit").Inc()
340+
return cached.value, nil
341+
}
342+
s.logger.Info("all scrapes failed and cache expired, returning 0")
343+
temporalSlotsScrapeErrors.WithLabelValues(s.podNamespace, s.metadata.TaskQueue, "all_pods_failed_cache_expired").Inc()
344+
return 0, nil
345+
}
346+
347+
// Update cache with the fresh value.
348+
s.slotsMu.Lock()
349+
s.lastSlots = slotsCache{value: totalUsedSlots, timestamp: time.Now()}
350+
s.slotsMu.Unlock()
351+
352+
return totalUsedSlots, nil
353+
}
354+
355+
// scrapeWorkerSlots fetches Prometheus metrics from a single worker pod and returns
356+
// the sum of temporal_worker_task_slots_used for worker types matching the
357+
// configured queueTypes and task queue.
358+
func (s *temporalScaler) scrapeWorkerSlots(ctx context.Context, podIP string) (int64, error) {
359+
hostPort := net.JoinHostPort(podIP, strconv.Itoa(s.metadata.WorkerMetricsPort))
360+
url := fmt.Sprintf("http://%s/metrics", hostPort)
361+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
362+
if err != nil {
363+
return 0, fmt.Errorf("create request: %w", err)
364+
}
365+
366+
resp, err := s.httpClient.Do(req)
367+
if err != nil {
368+
return 0, fmt.Errorf("scrape %s: %w", url, err)
369+
}
370+
defer resp.Body.Close()
371+
372+
if resp.StatusCode != http.StatusOK {
373+
return 0, fmt.Errorf("scrape %s returned status %d", url, resp.StatusCode)
374+
}
375+
376+
limitedBody := io.LimitReader(resp.Body, maxMetricsResponseBytes)
377+
// Only count ActivityWorker slots. WorkflowWorker slots always report >= 1
378+
// due to Temporal SDK's sticky workflow cache and are not meaningful for
379+
// scaling decisions. queueTypes still controls backlog counting via
380+
// getQueueTypes/DescribeTaskQueueEnhanced.
381+
activityOnly := map[string]bool{"ActivityWorker": true}
382+
return parseUsedSlots(limitedBody, s.metadata.TaskQueue, activityOnly)
383+
}
384+
385+
// parseUsedSlots parses Prometheus text format and extracts the sum of
386+
// temporal_worker_task_slots_used for the given worker types matching the task queue.
387+
func parseUsedSlots(r io.Reader, taskQueue string, workerTypes map[string]bool) (int64, error) {
388+
var parser expfmt.TextParser
389+
families, err := parser.TextToMetricFamilies(r)
390+
if err != nil {
391+
return 0, fmt.Errorf("parse prometheus metrics: %w", err)
392+
}
393+
394+
family, ok := families["temporal_worker_task_slots_used"]
395+
if !ok {
396+
return 0, nil
397+
}
398+
399+
var total int64
400+
for _, m := range family.GetMetric() {
401+
if matchesWorkerSlot(m, taskQueue, workerTypes) {
402+
total += int64(m.GetGauge().GetValue())
403+
}
404+
}
405+
return total, nil
406+
}
407+
408+
// matchesWorkerSlot returns true if the metric's worker_type is in the allowed set
409+
// and (if taskQueue is non-empty) task_queue matches the configured queue.
410+
func matchesWorkerSlot(m *dto.Metric, taskQueue string, workerTypes map[string]bool) bool {
411+
var typeMatches bool
412+
// Empty taskQueue matches all queues (including metrics missing the label).
413+
queueMatches := taskQueue == ""
414+
for _, lp := range m.GetLabel() {
415+
switch lp.GetName() {
416+
case "worker_type":
417+
typeMatches = workerTypes[lp.GetValue()]
418+
case "task_queue":
419+
if !queueMatches {
420+
queueMatches = lp.GetValue() == taskQueue
421+
}
422+
}
423+
}
424+
return typeMatches && queueMatches
425+
}
426+
427+
// isPodReady returns true if the pod has a Ready condition set to True.
428+
// Pods that aren't ready yet (e.g. during startup before the readiness probe
429+
// passes) likely don't have their metrics endpoint available.
430+
func isPodReady(pod *corev1.Pod) bool {
431+
for _, c := range pod.Status.Conditions {
432+
if c.Type == corev1.PodReady {
433+
return c.Status == corev1.ConditionTrue
434+
}
435+
}
436+
return false
437+
}
438+
204439
func getQueueTypes(queueTypes []string) []sdk.TaskQueueType {
205440
var taskQueueTypes []sdk.TaskQueueType
206441
for _, t := range queueTypes {

0 commit comments

Comments
 (0)