Skip to content

Commit b63fd32

Browse files
committed
Feat: Add saturation and capacity Prometheus metrics (#912)
Add 5 new Prometheus gauge metrics exposing saturation analysis outputs that drive scaling decisions, giving operators visibility into why scaling happens: - wva_saturation_utilization: per-variant utilization ratio (0.0-1.0) - wva_spare_capacity: per-variant spare capacity (0.0-1.0) - wva_required_capacity: model-level required capacity (>0 = scale-up) - wva_kv_cache_tokens_used: KV cache tokens in use per variant - wva_kv_cache_tokens_total: KV cache token capacity per variant Metrics are populated in both V1 (percentage-based) and V2 (token-based) engine paths and emitted during applySaturationDecisions.
1 parent cef2858 commit b63fd32

8 files changed

Lines changed: 492 additions & 12 deletions

File tree

internal/actuator/actuator.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
llmdOptv1alpha1 "github.com/llm-d/llm-d-workload-variant-autoscaler/api/v1alpha1"
88
appsv1 "k8s.io/api/apps/v1"
99

10+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
1011
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/metrics"
1112
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/utils"
1213
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -95,3 +96,18 @@ func (a *Actuator) EmitMetrics(ctx context.Context, VariantAutoscaling *llmdOptv
9596
"accelerator", VariantAutoscaling.Status.DesiredOptimizedAlloc.Accelerator)
9697
return nil
9798
}
99+
100+
// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics from a decision.
101+
func (a *Actuator) EmitSaturationMetrics(ctx context.Context, decision interfaces.VariantDecision) error {
102+
return a.MetricsEmitter.EmitSaturationMetrics(
103+
ctx,
104+
decision.VariantName,
105+
decision.Namespace,
106+
decision.AcceleratorName,
107+
decision.Utilization,
108+
decision.SpareCapacity,
109+
decision.RequiredCapacity,
110+
decision.KvCacheTokensUsed,
111+
decision.KvCacheTokensTotal,
112+
)
113+
}

internal/constants/metrics.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,26 @@ const (
108108
// WVADesiredRatio is a gauge that tracks the ratio of desired to current replicas.
109109
// Labels: variant_name, namespace, accelerator_type
110110
WVADesiredRatio = "wva_desired_ratio"
111+
112+
// WVASaturationUtilization is a gauge that tracks per-variant utilization ratio (0.0-1.0).
113+
// Labels: variant_name, namespace, accelerator_type
114+
WVASaturationUtilization = "wva_saturation_utilization"
115+
116+
// WVASpareCapacity is a gauge that tracks per-variant spare capacity (0.0-1.0).
117+
// Labels: variant_name, namespace, accelerator_type
118+
WVASpareCapacity = "wva_spare_capacity"
119+
120+
// WVARequiredCapacity is a gauge that tracks model-level required capacity.
121+
// >0 means scale-up needed. Labels: variant_name, namespace
122+
WVARequiredCapacity = "wva_required_capacity"
123+
124+
// WVAKvCacheTokensUsed is a gauge that tracks total KV cache tokens currently in use per variant.
125+
// Labels: variant_name, namespace
126+
WVAKvCacheTokensUsed = "wva_kv_cache_tokens_used"
127+
128+
// WVAKvCacheTokensTotal is a gauge that tracks total KV cache token capacity per variant.
129+
// Labels: variant_name, namespace
130+
WVAKvCacheTokensTotal = "wva_kv_cache_tokens_total"
111131
)
112132

113133
// Metric Label Names

internal/engines/pipeline/cost_aware_optimizer.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -292,18 +292,21 @@ func buildDecisionsWithOptimizer(
292292
}
293293

294294
decisions = append(decisions, interfaces.VariantDecision{
295-
VariantName: name,
296-
ModelID: req.ModelID,
297-
Namespace: req.Namespace,
298-
AcceleratorName: vc.AcceleratorName,
299-
Cost: vc.Cost,
300-
Role: state.Role,
301-
CurrentReplicas: state.CurrentReplicas,
302-
TargetReplicas: target,
303-
Action: action,
304-
Reason: reason,
305-
MinReplicas: state.MinReplicas,
306-
MaxReplicas: state.MaxReplicas,
295+
VariantName: name,
296+
ModelID: req.ModelID,
297+
Namespace: req.Namespace,
298+
AcceleratorName: vc.AcceleratorName,
299+
Cost: vc.Cost,
300+
Role: state.Role,
301+
CurrentReplicas: state.CurrentReplicas,
302+
TargetReplicas: target,
303+
Action: action,
304+
Reason: reason,
305+
MinReplicas: state.MinReplicas,
306+
MaxReplicas: state.MaxReplicas,
307+
Utilization: vc.Utilization,
308+
SpareCapacity: 1.0 - vc.Utilization,
309+
RequiredCapacity: req.Result.RequiredCapacity,
307310
})
308311
}
309312
return decisions

internal/engines/saturation/engine.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,9 @@ func (e *Engine) optimizeV1(
361361
// Convert saturation targets to decisions first, then apply enforcer
362362
finalDecisions = e.convertSaturationTargetsToDecisions(ctx, saturationTargets, saturationAnalysis, data.variantStates)
363363

364+
// Enrich decisions with saturation metrics for observability (V1 path)
365+
enrichDecisionsFromReplicaMetrics(finalDecisions, data.replicaMetrics, saturationAnalysis.ShouldScaleUp)
366+
364367
// Check if any variant has minReplicas > 0 — if so, skip scale-to-zero enforcement
365368
if !hasMinReplicasAboveZero(data.variantStates) {
366369
// Apply scale-to-zero enforcement on decisions
@@ -435,6 +438,8 @@ func (e *Engine) optimizeV2(
435438

436439
// Stage 1: Collect ModelScalingRequests for all models
437440
var requests []pipeline.ModelScalingRequest
441+
// modelReplicaMetrics collects per-model replica metrics for KV token enrichment
442+
modelReplicaMetrics := make(map[string][]interfaces.ReplicaMetrics)
438443

439444
for groupKey, modelVAs := range modelGroups {
440445
modelID := modelVAs[0].Spec.ModelID
@@ -475,6 +480,7 @@ func (e *Engine) optimizeV2(
475480
}
476481

477482
requests = append(requests, *req)
483+
modelReplicaMetrics[modelID] = data.replicaMetrics
478484
}
479485

480486
if len(requests) == 0 {
@@ -522,6 +528,11 @@ func (e *Engine) optimizeV2(
522528
}
523529
}
524530

531+
// Stage 4: Enrich decisions with KV cache token data from replicaMetrics.
532+
// Utilization, RequiredCapacity, and SpareCapacity are already set by
533+
// buildDecisionsWithOptimizer from AnalyzerResult.
534+
enrichDecisionsWithKvTokenData(allDecisions, modelReplicaMetrics)
535+
525536
return allDecisions
526537
}
527538

@@ -746,6 +757,77 @@ func (e *Engine) convertSaturationTargetsToDecisions(
746757
return decisions
747758
}
748759

760+
// enrichDecisionsFromReplicaMetrics populates saturation observability fields on decisions
761+
// by aggregating per-pod ReplicaMetrics per variant. Used by the V1 path where
762+
// Utilization and RequiredCapacity are not set by the optimizer.
763+
func enrichDecisionsFromReplicaMetrics(decisions []interfaces.VariantDecision, replicaMetrics []interfaces.ReplicaMetrics, shouldScaleUp bool) {
764+
// Aggregate per variant
765+
type variantAgg struct {
766+
kvUsed int64
767+
kvTotal int64
768+
kvUsageSum float64
769+
count int
770+
}
771+
agg := make(map[string]*variantAgg)
772+
for _, rm := range replicaMetrics {
773+
a, ok := agg[rm.VariantName]
774+
if !ok {
775+
a = &variantAgg{}
776+
agg[rm.VariantName] = a
777+
}
778+
a.kvUsed += rm.TokensInUse
779+
a.kvTotal += rm.TotalKvCapacityTokens
780+
a.kvUsageSum += rm.KvCacheUsage
781+
a.count++
782+
}
783+
784+
requiredCapacity := float64(0)
785+
if shouldScaleUp {
786+
requiredCapacity = 1.0
787+
}
788+
789+
for i := range decisions {
790+
d := &decisions[i]
791+
d.RequiredCapacity = requiredCapacity
792+
if a, ok := agg[d.VariantName]; ok && a.count > 0 {
793+
d.KvCacheTokensUsed = a.kvUsed
794+
d.KvCacheTokensTotal = a.kvTotal
795+
d.Utilization = a.kvUsageSum / float64(a.count)
796+
}
797+
}
798+
}
799+
800+
// enrichDecisionsWithKvTokenData sets KvCacheTokensUsed and KvCacheTokensTotal on decisions
801+
// from replica metrics aggregated per variant. Used by V2 path where Utilization and
802+
// RequiredCapacity are already set from AnalyzerResult.
803+
func enrichDecisionsWithKvTokenData(decisions []interfaces.VariantDecision, modelReplicaMetrics map[string][]interfaces.ReplicaMetrics) {
804+
// Build per-variant KV token aggregation across all models
805+
type kvAgg struct {
806+
kvUsed int64
807+
kvTotal int64
808+
}
809+
agg := make(map[string]*kvAgg)
810+
for _, metrics := range modelReplicaMetrics {
811+
for _, rm := range metrics {
812+
a, ok := agg[rm.VariantName]
813+
if !ok {
814+
a = &kvAgg{}
815+
agg[rm.VariantName] = a
816+
}
817+
a.kvUsed += rm.TokensInUse
818+
a.kvTotal += rm.TotalKvCapacityTokens
819+
}
820+
}
821+
822+
for i := range decisions {
823+
d := &decisions[i]
824+
if a, ok := agg[d.VariantName]; ok {
825+
d.KvCacheTokensUsed = a.kvUsed
826+
d.KvCacheTokensTotal = a.kvTotal
827+
}
828+
}
829+
}
830+
749831
// hasMinReplicasAboveZero returns true if any variant in the states has MinReplicas > 0.
750832
func hasMinReplicasAboveZero(states []interfaces.VariantReplicaState) bool {
751833
for _, state := range states {
@@ -1089,6 +1171,13 @@ func (e *Engine) applySaturationDecisions(
10891171
updateVa.Status.Actuation.Applied = true
10901172
}
10911173

1174+
// Emit saturation and capacity metrics for observability
1175+
if hasDecision {
1176+
if err := act.EmitSaturationMetrics(ctx, decision); err != nil {
1177+
logger.Error(err, "Failed to emit saturation metrics", "variant", updateVa.Name)
1178+
}
1179+
}
1180+
10921181
// Update Shared State and Trigger Reconcile via Channel
10931182
// This avoids any API server interaction from the Engine.
10941183

internal/interfaces/saturation_analyzer.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,22 @@ type VariantDecision struct {
186186
// SpareCapacity indicates how much spare capacity this variant has.
187187
// 0.0 = fully saturated, 1.0 = completely idle.
188188
// Used by allocation algorithms to prioritize saturated variants.
189+
// V1: threshold-relative spare KV capacity (AvgSpareKvCapacity).
190+
// V2: 1.0 - Utilization (absolute spare).
189191
SpareCapacity float64
192+
// Utilization is the variant-level utilization ratio (0.0-1.0).
193+
// V2: from AnalyzerResult.VariantCapacities[].Utilization.
194+
// V1: average KvCacheUsage across this variant's replicas.
195+
Utilization float64
196+
// KvCacheTokensUsed is the sum of TokensInUse across this variant's replicas.
197+
KvCacheTokensUsed int64
198+
// KvCacheTokensTotal is the sum of TotalKvCapacityTokens across this variant's replicas.
199+
KvCacheTokensTotal int64
200+
// RequiredCapacity is the model-level required capacity (>0 means scale-up needed).
201+
// Same value for all variants of a model.
202+
// V1: binary (1.0 if shouldScaleUp, else 0.0).
203+
// V2: continuous token-based demand from AnalyzerResult.
204+
RequiredCapacity float64
190205
// ScaleTargetRef references the Deployment/StatefulSet for scheduling constraints
191206
ScaleTargetRef *autoscalingv2.CrossVersionObjectReference
192207

internal/metrics/metrics.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ var (
1919
currentReplicas *prometheus.GaugeVec
2020
desiredRatio *prometheus.GaugeVec
2121

22+
// Saturation and capacity metrics
23+
saturationUtilization *prometheus.GaugeVec
24+
spareCapacity *prometheus.GaugeVec
25+
requiredCapacity *prometheus.GaugeVec
26+
kvCacheTokensUsed *prometheus.GaugeVec
27+
kvCacheTokensTotal *prometheus.GaugeVec
28+
2229
// controllerInstance stores the optional controller instance identifier.
2330
// When set, it's added as a label to all emitted metrics.
2431
controllerInstance string
@@ -41,10 +48,13 @@ func InitMetrics(registry prometheus.Registerer) error {
4148
// Build label sets based on whether controller_instance is configured
4249
baseLabels := []string{constants.LabelVariantName, constants.LabelNamespace, constants.LabelAcceleratorType}
4350
scalingLabels := []string{constants.LabelVariantName, constants.LabelNamespace, constants.LabelDirection, constants.LabelReason}
51+
// modelLabels: variant_name + namespace only (no accelerator_type) for model-level and token metrics
52+
modelLabels := []string{constants.LabelVariantName, constants.LabelNamespace}
4453

4554
if controllerInstance != "" {
4655
baseLabels = append(baseLabels, constants.LabelControllerInstance)
4756
scalingLabels = append(scalingLabels, constants.LabelControllerInstance)
57+
modelLabels = append(modelLabels, constants.LabelControllerInstance)
4858
}
4959

5060
replicaScalingTotal = prometheus.NewCounterVec(
@@ -75,6 +85,41 @@ func InitMetrics(registry prometheus.Registerer) error {
7585
},
7686
baseLabels,
7787
)
88+
saturationUtilization = prometheus.NewGaugeVec(
89+
prometheus.GaugeOpts{
90+
Name: constants.WVASaturationUtilization,
91+
Help: "Per-variant utilization ratio (0.0-1.0) from saturation analysis",
92+
},
93+
baseLabels,
94+
)
95+
spareCapacity = prometheus.NewGaugeVec(
96+
prometheus.GaugeOpts{
97+
Name: constants.WVASpareCapacity,
98+
Help: "Per-variant spare capacity (0.0-1.0) from saturation analysis",
99+
},
100+
baseLabels,
101+
)
102+
requiredCapacity = prometheus.NewGaugeVec(
103+
prometheus.GaugeOpts{
104+
Name: constants.WVARequiredCapacity,
105+
Help: "Model-level required capacity; >0 indicates scale-up needed (V1: binary 0/1, V2: continuous token demand)",
106+
},
107+
modelLabels,
108+
)
109+
kvCacheTokensUsed = prometheus.NewGaugeVec(
110+
prometheus.GaugeOpts{
111+
Name: constants.WVAKvCacheTokensUsed,
112+
Help: "Total KV cache tokens currently in use across all replicas of a variant",
113+
},
114+
modelLabels,
115+
)
116+
kvCacheTokensTotal = prometheus.NewGaugeVec(
117+
prometheus.GaugeOpts{
118+
Name: constants.WVAKvCacheTokensTotal,
119+
Help: "Total KV cache token capacity across all replicas of a variant",
120+
},
121+
modelLabels,
122+
)
78123

79124
// Register metrics with the registry
80125
if err := registry.Register(replicaScalingTotal); err != nil {
@@ -89,6 +134,21 @@ func InitMetrics(registry prometheus.Registerer) error {
89134
if err := registry.Register(desiredRatio); err != nil {
90135
return fmt.Errorf("failed to register desiredRatio metric: %w", err)
91136
}
137+
if err := registry.Register(saturationUtilization); err != nil {
138+
return fmt.Errorf("failed to register saturationUtilization metric: %w", err)
139+
}
140+
if err := registry.Register(spareCapacity); err != nil {
141+
return fmt.Errorf("failed to register spareCapacity metric: %w", err)
142+
}
143+
if err := registry.Register(requiredCapacity); err != nil {
144+
return fmt.Errorf("failed to register requiredCapacity metric: %w", err)
145+
}
146+
if err := registry.Register(kvCacheTokensUsed); err != nil {
147+
return fmt.Errorf("failed to register kvCacheTokensUsed metric: %w", err)
148+
}
149+
if err := registry.Register(kvCacheTokensTotal); err != nil {
150+
return fmt.Errorf("failed to register kvCacheTokensTotal metric: %w", err)
151+
}
92152

93153
return nil
94154
}
@@ -163,3 +223,39 @@ func (m *MetricsEmitter) EmitReplicaMetrics(ctx context.Context, va *llmdOptv1al
163223
desiredRatio.With(baseLabels).Set(float64(desired) / float64(current))
164224
return nil
165225
}
226+
227+
// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics
228+
func (m *MetricsEmitter) EmitSaturationMetrics(
229+
ctx context.Context,
230+
variantName, namespace, acceleratorType string,
231+
utilization, spare, required float64,
232+
kvTokensUsed, kvTokensTotal int64,
233+
) error {
234+
if saturationUtilization == nil || spareCapacity == nil || requiredCapacity == nil ||
235+
kvCacheTokensUsed == nil || kvCacheTokensTotal == nil {
236+
return fmt.Errorf("saturation metrics not initialized")
237+
}
238+
239+
accelLabels := prometheus.Labels{
240+
constants.LabelVariantName: variantName,
241+
constants.LabelNamespace: namespace,
242+
constants.LabelAcceleratorType: acceleratorType,
243+
}
244+
modelLabels := prometheus.Labels{
245+
constants.LabelVariantName: variantName,
246+
constants.LabelNamespace: namespace,
247+
}
248+
249+
if controllerInstance != "" {
250+
accelLabels[constants.LabelControllerInstance] = controllerInstance
251+
modelLabels[constants.LabelControllerInstance] = controllerInstance
252+
}
253+
254+
saturationUtilization.With(accelLabels).Set(utilization)
255+
spareCapacity.With(accelLabels).Set(spare)
256+
requiredCapacity.With(modelLabels).Set(required)
257+
kvCacheTokensUsed.With(modelLabels).Set(float64(kvTokensUsed))
258+
kvCacheTokensTotal.With(modelLabels).Set(float64(kvTokensTotal))
259+
260+
return nil
261+
}

0 commit comments

Comments
 (0)