Skip to content

Commit 40b8062

Browse files
committed
Improvements: analyzer_version label, model-keyed V2 aggregation, delete hook
- Add analyzer_version label to wva_required_capacity to disambiguate V1 (binary 0/1) from V2 (continuous token demand) units. Add AnalyzerVersion field to VariantDecision; set "v1" in enrichDecisionsFromReplicaMetrics and "v2" in enrichDecisionsWithKvTokenData. - Add AnalyzerVersionV1/V2 constants and LabelAnalyzerVersion constant. - Key V2 KV-token aggregation by (modelID, variantName) instead of just variantName; variant names can collide across models in the same cycle. - Add MetricsEmitter.DeleteSaturationMetrics() so the controller delete handler can remove stale time series when a VariantAutoscaling is deleted. - Update tests: cover V1/V2 label distinction, Delete behavior, and analyzer version on controller_instance test.
1 parent b63fd32 commit 40b8062

File tree

6 files changed

+197
-24
lines changed

6 files changed

+197
-24
lines changed

internal/actuator/actuator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (a *Actuator) EmitSaturationMetrics(ctx context.Context, decision interface
104104
decision.VariantName,
105105
decision.Namespace,
106106
decision.AcceleratorName,
107+
decision.AnalyzerVersion,
107108
decision.Utilization,
108109
decision.SpareCapacity,
109110
decision.RequiredCapacity,

internal/constants/metrics.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,11 @@ const (
118118
WVASpareCapacity = "wva_spare_capacity"
119119

120120
// WVARequiredCapacity is a gauge that tracks model-level required capacity.
121-
// >0 means scale-up needed. Labels: variant_name, namespace
121+
// >0 means scale-up needed.
122+
// Units differ by analyzer (use the analyzer_version label to distinguish):
123+
// - V1: binary signal (0.0 = no scale-up, 1.0 = scale-up needed)
124+
// - V2: continuous token-based demand
125+
// Labels: variant_name, namespace, analyzer_version
122126
WVARequiredCapacity = "wva_required_capacity"
123127

124128
// WVAKvCacheTokensUsed is a gauge that tracks total KV cache tokens currently in use per variant.
@@ -140,4 +144,11 @@ const (
140144
LabelReason = "reason"
141145
LabelAcceleratorType = "accelerator_type"
142146
LabelControllerInstance = "controller_instance"
147+
LabelAnalyzerVersion = "analyzer_version"
148+
)
149+
150+
// Analyzer version label values used in saturation metrics.
151+
const (
152+
AnalyzerVersionV1 = "v1"
153+
AnalyzerVersionV2 = "v2"
143154
)

internal/engines/saturation/engine.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/collector/registration"
3838
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/collector/source"
3939
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/config"
40+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/constants"
4041
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/discovery"
4142
queueingmodel "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/engines/analyzers/queueingmodel"
4243
saturation_v2 "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/engines/analyzers/saturation_v2"
@@ -789,6 +790,7 @@ func enrichDecisionsFromReplicaMetrics(decisions []interfaces.VariantDecision, r
789790
for i := range decisions {
790791
d := &decisions[i]
791792
d.RequiredCapacity = requiredCapacity
793+
d.AnalyzerVersion = constants.AnalyzerVersionV1
792794
if a, ok := agg[d.VariantName]; ok && a.count > 0 {
793795
d.KvCacheTokensUsed = a.kvUsed
794796
d.KvCacheTokensTotal = a.kvTotal
@@ -797,22 +799,30 @@ func enrichDecisionsFromReplicaMetrics(decisions []interfaces.VariantDecision, r
797799
}
798800
}
799801

800-
// enrichDecisionsWithKvTokenData sets KvCacheTokensUsed and KvCacheTokensTotal on decisions
801-
// from replica metrics aggregated per variant. Used by V2 path where Utilization and
802-
// RequiredCapacity are already set from AnalyzerResult.
802+
// enrichDecisionsWithKvTokenData sets KvCacheTokensUsed, KvCacheTokensTotal, and
803+
// AnalyzerVersion on decisions from replica metrics aggregated per (model, variant).
804+
// Used by V2 path where Utilization and RequiredCapacity are already set from
805+
// AnalyzerResult.
806+
//
807+
// Aggregation is keyed by (modelID, variantName) — not just variantName — because
808+
// variant names can collide across different models in the same reconcile cycle.
803809
func enrichDecisionsWithKvTokenData(decisions []interfaces.VariantDecision, modelReplicaMetrics map[string][]interfaces.ReplicaMetrics) {
804-
// Build per-variant KV token aggregation across all models
805810
type kvAgg struct {
806811
kvUsed int64
807812
kvTotal int64
808813
}
809-
agg := make(map[string]*kvAgg)
810-
for _, metrics := range modelReplicaMetrics {
814+
type variantKey struct {
815+
modelID string
816+
variant string
817+
}
818+
agg := make(map[variantKey]*kvAgg)
819+
for modelID, metrics := range modelReplicaMetrics {
811820
for _, rm := range metrics {
812-
a, ok := agg[rm.VariantName]
821+
k := variantKey{modelID: modelID, variant: rm.VariantName}
822+
a, ok := agg[k]
813823
if !ok {
814824
a = &kvAgg{}
815-
agg[rm.VariantName] = a
825+
agg[k] = a
816826
}
817827
a.kvUsed += rm.TokensInUse
818828
a.kvTotal += rm.TotalKvCapacityTokens
@@ -821,7 +831,8 @@ func enrichDecisionsWithKvTokenData(decisions []interfaces.VariantDecision, mode
821831

822832
for i := range decisions {
823833
d := &decisions[i]
824-
if a, ok := agg[d.VariantName]; ok {
834+
d.AnalyzerVersion = constants.AnalyzerVersionV2
835+
if a, ok := agg[variantKey{modelID: d.ModelID, variant: d.VariantName}]; ok {
825836
d.KvCacheTokensUsed = a.kvUsed
826837
d.KvCacheTokensTotal = a.kvTotal
827838
}
@@ -1171,7 +1182,11 @@ func (e *Engine) applySaturationDecisions(
11711182
updateVa.Status.Actuation.Applied = true
11721183
}
11731184

1174-
// Emit saturation and capacity metrics for observability
1185+
// Emit saturation and capacity metrics for observability.
1186+
// Note: stale time series for deleted VAs are not cleaned up automatically here.
1187+
// The metrics package exposes DeleteSaturationMetrics for callers (e.g., the
1188+
// VariantAutoscaling reconciler's delete handler / finalizer) to remove series
1189+
// when a VA is removed.
11751190
if hasDecision {
11761191
if err := act.EmitSaturationMetrics(ctx, decision); err != nil {
11771192
logger.Error(err, "Failed to emit saturation metrics", "variant", updateVa.Name)

internal/interfaces/saturation_analyzer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,13 @@ type VariantDecision struct {
201201
// Same value for all variants of a model.
202202
// V1: binary (1.0 if shouldScaleUp, else 0.0).
203203
// V2: continuous token-based demand from AnalyzerResult.
204+
// Use AnalyzerVersion to disambiguate the units when consuming this field
205+
// (or its corresponding Prometheus metric).
204206
RequiredCapacity float64
207+
// AnalyzerVersion identifies which analyzer produced this decision ("v1" or "v2").
208+
// Exposed as a Prometheus label on saturation metrics so dashboards can filter
209+
// by analyzer to handle the V1/V2 unit difference in RequiredCapacity.
210+
AnalyzerVersion string
205211
// ScaleTargetRef references the Deployment/StatefulSet for scheduling constraints
206212
ScaleTargetRef *autoscalingv2.CrossVersionObjectReference
207213

internal/metrics/metrics.go

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@ func InitMetrics(registry prometheus.Registerer) error {
5050
scalingLabels := []string{constants.LabelVariantName, constants.LabelNamespace, constants.LabelDirection, constants.LabelReason}
5151
// modelLabels: variant_name + namespace only (no accelerator_type) for model-level and token metrics
5252
modelLabels := []string{constants.LabelVariantName, constants.LabelNamespace}
53+
// requiredCapacityLabels: model labels + analyzer_version to disambiguate V1 (binary)
54+
// vs V2 (continuous tokens) units of the wva_required_capacity gauge
55+
requiredCapacityLabels := []string{constants.LabelVariantName, constants.LabelNamespace, constants.LabelAnalyzerVersion}
5356

5457
if controllerInstance != "" {
5558
baseLabels = append(baseLabels, constants.LabelControllerInstance)
5659
scalingLabels = append(scalingLabels, constants.LabelControllerInstance)
5760
modelLabels = append(modelLabels, constants.LabelControllerInstance)
61+
requiredCapacityLabels = append(requiredCapacityLabels, constants.LabelControllerInstance)
5862
}
5963

6064
replicaScalingTotal = prometheus.NewCounterVec(
@@ -102,9 +106,9 @@ func InitMetrics(registry prometheus.Registerer) error {
102106
requiredCapacity = prometheus.NewGaugeVec(
103107
prometheus.GaugeOpts{
104108
Name: constants.WVARequiredCapacity,
105-
Help: "Model-level required capacity; >0 indicates scale-up needed (V1: binary 0/1, V2: continuous token demand)",
109+
Help: "Model-level required capacity; >0 indicates scale-up needed. Use the analyzer_version label to distinguish units (V1: binary 0/1, V2: continuous token demand).",
106110
},
107-
modelLabels,
111+
requiredCapacityLabels,
108112
)
109113
kvCacheTokensUsed = prometheus.NewGaugeVec(
110114
prometheus.GaugeOpts{
@@ -224,10 +228,12 @@ func (m *MetricsEmitter) EmitReplicaMetrics(ctx context.Context, va *llmdOptv1al
224228
return nil
225229
}
226230

227-
// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics
231+
// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics.
232+
// analyzerVersion ("v1" or "v2") is used as a label on wva_required_capacity to
233+
// disambiguate the units of the required value (V1: binary, V2: continuous tokens).
228234
func (m *MetricsEmitter) EmitSaturationMetrics(
229235
ctx context.Context,
230-
variantName, namespace, acceleratorType string,
236+
variantName, namespace, acceleratorType, analyzerVersion string,
231237
utilization, spare, required float64,
232238
kvTokensUsed, kvTokensTotal int64,
233239
) error {
@@ -245,17 +251,60 @@ func (m *MetricsEmitter) EmitSaturationMetrics(
245251
constants.LabelVariantName: variantName,
246252
constants.LabelNamespace: namespace,
247253
}
254+
requiredLabels := prometheus.Labels{
255+
constants.LabelVariantName: variantName,
256+
constants.LabelNamespace: namespace,
257+
constants.LabelAnalyzerVersion: analyzerVersion,
258+
}
248259

249260
if controllerInstance != "" {
250261
accelLabels[constants.LabelControllerInstance] = controllerInstance
251262
modelLabels[constants.LabelControllerInstance] = controllerInstance
263+
requiredLabels[constants.LabelControllerInstance] = controllerInstance
252264
}
253265

254266
saturationUtilization.With(accelLabels).Set(utilization)
255267
spareCapacity.With(accelLabels).Set(spare)
256-
requiredCapacity.With(modelLabels).Set(required)
268+
requiredCapacity.With(requiredLabels).Set(required)
257269
kvCacheTokensUsed.With(modelLabels).Set(float64(kvTokensUsed))
258270
kvCacheTokensTotal.With(modelLabels).Set(float64(kvTokensTotal))
259271

260272
return nil
261273
}
274+
275+
// DeleteSaturationMetrics removes saturation metric series for the given variant.
276+
// Should be called when a VariantAutoscaling resource is deleted to prevent stale
277+
// time series from accumulating in Prometheus.
278+
//
279+
// TODO: wire this from the controller's VariantAutoscaling delete handler / finalizer.
280+
// Until that wiring exists, deleted VAs leave their last-emitted metric values in the
281+
// registry indefinitely.
282+
func (m *MetricsEmitter) DeleteSaturationMetrics(variantName, namespace, acceleratorType, analyzerVersion string) {
283+
if saturationUtilization == nil {
284+
return
285+
}
286+
accelLabels := prometheus.Labels{
287+
constants.LabelVariantName: variantName,
288+
constants.LabelNamespace: namespace,
289+
constants.LabelAcceleratorType: acceleratorType,
290+
}
291+
modelLabels := prometheus.Labels{
292+
constants.LabelVariantName: variantName,
293+
constants.LabelNamespace: namespace,
294+
}
295+
requiredLabels := prometheus.Labels{
296+
constants.LabelVariantName: variantName,
297+
constants.LabelNamespace: namespace,
298+
constants.LabelAnalyzerVersion: analyzerVersion,
299+
}
300+
if controllerInstance != "" {
301+
accelLabels[constants.LabelControllerInstance] = controllerInstance
302+
modelLabels[constants.LabelControllerInstance] = controllerInstance
303+
requiredLabels[constants.LabelControllerInstance] = controllerInstance
304+
}
305+
saturationUtilization.Delete(accelLabels)
306+
spareCapacity.Delete(accelLabels)
307+
requiredCapacity.Delete(requiredLabels)
308+
kvCacheTokensUsed.Delete(modelLabels)
309+
kvCacheTokensTotal.Delete(modelLabels)
310+
}

0 commit comments

Comments
 (0)