Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/actuator/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

llmdOptv1alpha1 "github.com/llm-d/llm-d-workload-variant-autoscaler/api/v1alpha1"

"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/metrics"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/utils/scaletarget"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -92,3 +93,28 @@ func (a *Actuator) EmitMetrics(ctx context.Context, variantAutoscaling *llmdOptv
"accelerator", variantAutoscaling.Status.DesiredOptimizedAlloc.Accelerator)
return nil
}

// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics from a decision.
func (a *Actuator) EmitSaturationMetrics(ctx context.Context, decision interfaces.VariantDecision) error {
return a.MetricsEmitter.EmitSaturationMetrics(
ctx,
decision.VariantName,
decision.Namespace,
Comment thread
ev-shindin marked this conversation as resolved.
decision.ModelID,
decision.AcceleratorName,
decision.RequiredCapacityUnit,
decision.Utilization,
decision.SpareCapacity,
decision.RequiredCapacity,
decision.KvCacheTokensUsed,
decision.KvCacheTokensCapacity,
)
}

// DeleteSaturationMetricsForVariant removes all saturation metric series for a
// variant. Call this when the current optimization cycle produced no fresh
// decision for the variant, or when the VA is being deleted — so dashboards
// don't show stale values.
func (a *Actuator) DeleteSaturationMetricsForVariant(variantName, namespace string) {
a.MetricsEmitter.DeleteSaturationMetricsForVariant(variantName, namespace)
}
36 changes: 36 additions & 0 deletions internal/constants/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,30 @@ const (
// WVADesiredRatio is a gauge that tracks the ratio of desired to current replicas.
// Labels: variant_name, namespace, accelerator_type
WVADesiredRatio = "wva_desired_ratio"

// WVASaturationUtilization is a gauge that tracks per-variant utilization ratio (0.0-1.0).
// Labels: variant_name, namespace, model_name, accelerator_type
WVASaturationUtilization = "wva_saturation_utilization"

// WVASpareCapacity is a gauge that tracks per-variant spare capacity (0.0-1.0).
// Labels: variant_name, namespace, model_name, accelerator_type
WVASpareCapacity = "wva_spare_capacity"

// WVARequiredCapacity is a gauge that tracks model-level required capacity.
// >0 means scale-up needed.
Comment thread
shuynh2017 marked this conversation as resolved.
// Value semantics differ by analyzer (use the "unit" label to distinguish):
// - unit="binary" (V1): 0.0 = no scale-up, 1.0 = scale-up needed
// - unit="continuous" (V2): continuous token-based demand
// Labels: variant_name, namespace, model_name, unit
WVARequiredCapacity = "wva_required_capacity"

// WVAKvCacheTokensUsed is a gauge that tracks total KV cache tokens currently in use per variant.
// Labels: variant_name, namespace, model_name
WVAKvCacheTokensUsed = "wva_kv_cache_tokens_used"

// WVAKvCacheTokensCapacity is a gauge that tracks total KV cache token capacity per variant.
// Labels: variant_name, namespace, model_name
WVAKvCacheTokensCapacity = "wva_kv_cache_tokens_capacity"
)

// Metric Label Names
Expand All @@ -120,4 +144,16 @@ const (
LabelReason = "reason"
LabelAcceleratorType = "accelerator_type"
LabelControllerInstance = "controller_instance"
// LabelUnit distinguishes the unit of a metric value when a single metric name
// carries values with different semantic units. Currently applied to
// wva_required_capacity, whose value is either a binary scale-up signal (V1)
// or a continuous token-demand value (V2).
LabelUnit = "unit"
)

// Values for the LabelUnit Prometheus label, describing how to interpret the
// metric value ("binary" 0/1 vs. "continuous" absolute quantity).
const (
UnitBinary = "binary"
UnitContinuous = "continuous"
)
27 changes: 15 additions & 12 deletions internal/engines/pipeline/cost_aware_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,21 @@ func buildDecisionsWithOptimizer(
}

decisions = append(decisions, interfaces.VariantDecision{
VariantName: name,
ModelID: req.ModelID,
Namespace: req.Namespace,
AcceleratorName: vc.AcceleratorName,
Cost: vc.Cost,
Role: state.Role,
CurrentReplicas: state.CurrentReplicas,
TargetReplicas: target,
Action: action,
Reason: reason,
MinReplicas: state.MinReplicas,
MaxReplicas: state.MaxReplicas,
VariantName: name,
ModelID: req.ModelID,
Namespace: req.Namespace,
AcceleratorName: vc.AcceleratorName,
Cost: vc.Cost,
Role: state.Role,
CurrentReplicas: state.CurrentReplicas,
TargetReplicas: target,
Action: action,
Reason: reason,
MinReplicas: state.MinReplicas,
MaxReplicas: state.MaxReplicas,
Utilization: vc.Utilization,
SpareCapacity: 1.0 - vc.Utilization,
Comment thread
shuynh2017 marked this conversation as resolved.
RequiredCapacity: req.Result.RequiredCapacity,
})
}
return decisions
Expand Down
116 changes: 116 additions & 0 deletions internal/engines/saturation/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/collector/registration"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/collector/source"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/config"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/constants"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/discovery"
queueingmodel "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/engines/analyzers/queueingmodel"
saturation_v2 "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/engines/analyzers/saturation_v2"
Expand Down Expand Up @@ -360,6 +361,9 @@ func (e *Engine) optimizeV1(
// Convert saturation targets to decisions first, then apply enforcer
finalDecisions = e.convertSaturationTargetsToDecisions(ctx, saturationTargets, saturationAnalysis, data.variantStates)

// Enrich decisions with saturation metrics for observability (V1 path)
enrichDecisionsFromReplicaMetrics(finalDecisions, data.replicaMetrics, saturationAnalysis.ShouldScaleUp)

// Check if any variant has minReplicas > 0 — if so, skip scale-to-zero enforcement
if !hasMinReplicasAboveZero(data.variantStates) {
// Apply scale-to-zero enforcement on decisions
Expand Down Expand Up @@ -434,6 +438,8 @@ func (e *Engine) optimizeV2(

// Stage 1: Collect ModelScalingRequests for all models
var requests []pipeline.ModelScalingRequest
// modelReplicaMetrics collects per-model replica metrics for KV token enrichment
modelReplicaMetrics := make(map[string][]interfaces.ReplicaMetrics)

for groupKey, modelVAs := range modelGroups {
modelID := modelVAs[0].Spec.ModelID
Expand Down Expand Up @@ -474,6 +480,7 @@ func (e *Engine) optimizeV2(
}

requests = append(requests, *req)
modelReplicaMetrics[modelID] = data.replicaMetrics
}

if len(requests) == 0 {
Expand Down Expand Up @@ -521,6 +528,11 @@ func (e *Engine) optimizeV2(
}
}

// Stage 4: Enrich decisions with KV cache token data from replicaMetrics.
// Utilization, RequiredCapacity, and SpareCapacity are already set by
// buildDecisionsWithOptimizer from AnalyzerResult.
enrichDecisionsWithKvTokenData(allDecisions, modelReplicaMetrics)

return allDecisions
}

Expand Down Expand Up @@ -720,6 +732,93 @@ func (e *Engine) convertSaturationTargetsToDecisions(
return decisions
}

// enrichDecisionsFromReplicaMetrics populates saturation observability fields on decisions
// by aggregating per-pod ReplicaMetrics per variant. Used by the V1 path where
// Utilization and RequiredCapacity are not set by the optimizer.
func enrichDecisionsFromReplicaMetrics(decisions []interfaces.VariantDecision, replicaMetrics []interfaces.ReplicaMetrics, shouldScaleUp bool) {
// Aggregate per variant
type variantAgg struct {
kvUsed int64
kvTotal int64
kvUsageSum float64
count int
}
agg := make(map[string]*variantAgg)
for _, rm := range replicaMetrics {
a, ok := agg[rm.VariantName]
if !ok {
a = &variantAgg{}
agg[rm.VariantName] = a
}
a.kvUsed += rm.TokensInUse
a.kvTotal += rm.TotalKvCapacityTokens
a.kvUsageSum += rm.KvCacheUsage
a.count++
}

requiredCapacity := float64(0)
if shouldScaleUp {
requiredCapacity = 1.0
}

for i := range decisions {
d := &decisions[i]
d.RequiredCapacity = requiredCapacity
d.RequiredCapacityUnit = constants.UnitBinary
if a, ok := agg[d.VariantName]; ok && a.count > 0 {
d.KvCacheTokensUsed = a.kvUsed
d.KvCacheTokensCapacity = a.kvTotal
// V1 reasons about saturation per-replica using KvCacheUsage fractions
// (rm.KvCacheUsage is 0.0-1.0), not tokens. Report the mean of those
// per-replica fractions as the variant-level utilization — this
// matches what the V1 analyzer actually evaluates against its
// thresholds. V2 uses a different (token-demand / capacity) formula;
// see the field doc on VariantDecision.Utilization.
d.Utilization = a.kvUsageSum / float64(a.count)
}
}
}

// enrichDecisionsWithKvTokenData sets KvCacheTokensUsed, KvCacheTokensCapacity, and
// RequiredCapacityUnit on decisions from replica metrics aggregated per (model, variant).
// Used by V2 path where Utilization and RequiredCapacity are already set from
// AnalyzerResult.
//
// Aggregation is keyed by (modelID, variantName) — not just variantName — because
// variant names can collide across different models in the same reconcile cycle.
func enrichDecisionsWithKvTokenData(decisions []interfaces.VariantDecision, modelReplicaMetrics map[string][]interfaces.ReplicaMetrics) {
type kvAgg struct {
kvUsed int64
kvTotal int64
}
type variantKey struct {
modelID string
variant string
}
agg := make(map[variantKey]*kvAgg)
for modelID, metrics := range modelReplicaMetrics {
for _, rm := range metrics {
k := variantKey{modelID: modelID, variant: rm.VariantName}
a, ok := agg[k]
if !ok {
a = &kvAgg{}
agg[k] = a
}
a.kvUsed += rm.TokensInUse
a.kvTotal += rm.TotalKvCapacityTokens
}
}

for i := range decisions {
d := &decisions[i]
d.RequiredCapacityUnit = constants.UnitContinuous
if a, ok := agg[variantKey{modelID: d.ModelID, variant: d.VariantName}]; ok {
d.KvCacheTokensUsed = a.kvUsed
d.KvCacheTokensCapacity = a.kvTotal
}
}
}

// hasMinReplicasAboveZero returns true if any variant in the states has MinReplicas > 0.
func hasMinReplicasAboveZero(states []interfaces.VariantReplicaState) bool {
for _, state := range states {
Expand Down Expand Up @@ -1063,6 +1162,23 @@ func (e *Engine) applySaturationDecisions(
updateVa.Status.Actuation.Applied = true
}

// Emit saturation and capacity metrics for observability.
// When this cycle produced no fresh decision for the variant, actively
// clear the existing series so dashboards show a gap ("no fresh data")
// rather than stale values that would otherwise persist until Prometheus'
// 5-minute staleness marker fires. For fully-deleted VAs, additional
// cleanup via the reconciler's delete handler / finalizer is still
// required (see DeleteSaturationMetricsForVariant).
if hasDecision {
if err := act.EmitSaturationMetrics(ctx, decision); err != nil {
logger.Error(err, "Failed to emit saturation metrics", "variant", updateVa.Name)
}
} else {
act.DeleteSaturationMetricsForVariant(updateVa.Name, updateVa.Namespace)
logger.V(logging.DEBUG).Info("Cleared stale saturation metrics (no fresh decision this cycle)",
"variant", updateVa.Name, "namespace", updateVa.Namespace)
}

// Update Shared State and Trigger Reconcile via Channel
// This avoids any API server interaction from the Engine.

Expand Down
28 changes: 28 additions & 0 deletions internal/interfaces/saturation_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,35 @@ type VariantDecision struct {
// SpareCapacity indicates how much spare capacity this variant has.
// 0.0 = fully saturated, 1.0 = completely idle.
// Used by allocation algorithms to prioritize saturated variants.
// V1: threshold-relative spare KV capacity (AvgSpareKvCapacity).
// V2: 1.0 - Utilization (absolute spare).
SpareCapacity float64
// Utilization is the variant-level utilization ratio (0.0-1.0) reported for
// observability. The exact formula differs by analyzer because V1 and V2
// reason about saturation differently:
// V1: mean of per-replica KvCacheUsage fractions (matches what V1's
// per-replica threshold check operates on).
// V2: TotalDemand / TotalCapacity from AnalyzerResult (token-demand-based).
// For uniform-capacity replicas the two are numerically equivalent; for
// mixed-capacity replicas V2's value is capacity-weighted.
Utilization float64
// KvCacheTokensUsed is the sum of TokensInUse across this variant's replicas.
KvCacheTokensUsed int64
// KvCacheTokensCapacity is the sum of TotalKvCapacityTokens across this variant's replicas.
KvCacheTokensCapacity int64
// RequiredCapacity is the model-level required capacity (>0 means scale-up needed).
// Same value for all variants of a model.
// V1: binary (1.0 if shouldScaleUp, else 0.0).
// V2: continuous token-based demand from AnalyzerResult.
// Use RequiredCapacityUnit to disambiguate the units when consuming this field
// (or its corresponding Prometheus metric).
RequiredCapacity float64
// RequiredCapacityUnit describes the unit of RequiredCapacity ("binary" or "continuous").
// Exposed as the `unit` Prometheus label on wva_required_capacity so dashboards
// can filter by semantics rather than by which analyzer produced the value.
// "binary": V1 path, value is 0.0 or 1.0
// "continuous": V2 path, value is a token-demand magnitude
RequiredCapacityUnit string
// ScaleTargetRef references the Deployment/StatefulSet for scheduling constraints
ScaleTargetRef *autoscalingv2.CrossVersionObjectReference

Expand Down
Loading
Loading