-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathactuator.go
More file actions
120 lines (104 loc) · 4.53 KB
/
actuator.go
File metadata and controls
120 lines (104 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package actuator
import (
"context"
"fmt"
llmdOptv1alpha1 "github.com/llm-d/llm-d-workload-variant-autoscaler/api/v1alpha1"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/metrics"
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/utils/scaletarget"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type Actuator struct {
Client client.Client
MetricsEmitter *metrics.MetricsEmitter
}
func NewActuator(k8sClient client.Client) *Actuator {
return &Actuator{
Client: k8sClient,
MetricsEmitter: metrics.NewMetricsEmitter(),
}
}
// GetCurrentScaleTargetReplicasFromVA gets the real current replica count from the actual Deployment/LWS
func (a *Actuator) GetCurrentScaleTargetReplicasFromVA(ctx context.Context, va *llmdOptv1alpha1.VariantAutoscaling) (int32, error) {
// Use ScaleTargetRef to get the scale target name
scaleTarget, err := scaletarget.FetchScaleTarget(ctx, a.Client, va.Name, va.Spec.ScaleTargetRef.Kind, va.GetScaleTargetName(), va.Namespace)
if err != nil {
return 0, fmt.Errorf("failed to get scale target %s/%s: %w", va.Namespace, va.GetScaleTargetName(), err)
}
return a.GetCurrentScaleTargetReplicasFromScaleTarget(va, scaleTarget)
}
// GetCurrentScaleTargetReplicasFromScaleTarget gets the real current replica count from the actual Deployment/LWS
func (a *Actuator) GetCurrentScaleTargetReplicasFromScaleTarget(va *llmdOptv1alpha1.VariantAutoscaling, scaleTarget scaletarget.ScaleTargetAccessor) (int32, error) {
if scaleTarget == nil {
return 0, fmt.Errorf("scale target cannot be nil for %s/%s", va.Namespace, va.GetScaleTargetName())
}
// Prefer status replicas (actual current state)
if scaleTarget.GetStatusReplicas() >= 0 {
return scaleTarget.GetStatusReplicas(), nil
}
// Fallback to spec if status not ready
if scaleTarget.GetReplicas() != nil {
return *scaleTarget.GetReplicas(), nil
}
// Final fallback
return 1, nil
}
func (a *Actuator) EmitMetrics(ctx context.Context, variantAutoscaling *llmdOptv1alpha1.VariantAutoscaling) error {
logger := log.FromContext(ctx)
if variantAutoscaling.Status.DesiredOptimizedAlloc.NumReplicas == nil {
logger.Info("Skipping EmitReplicaMetrics - no optimization decision yet",
"variantName", variantAutoscaling.Name)
return nil
}
desiredReplicas := *variantAutoscaling.Status.DesiredOptimizedAlloc.NumReplicas
// Get real current replicas from Deployment (not stale variantAutoscaling status)
currentReplicas, err := a.GetCurrentScaleTargetReplicasFromVA(ctx, variantAutoscaling)
if err != nil {
logger.Error(err, "Could not get current scale target replicas, using variantAutoscaling status",
"variantName", variantAutoscaling.Name)
currentReplicas = 0 // Fallback to 0 since CurrentAlloc is removed
}
if err := a.MetricsEmitter.EmitReplicaMetrics(
ctx,
variantAutoscaling,
currentReplicas,
desiredReplicas, // Inferno's optimization target
variantAutoscaling.Status.DesiredOptimizedAlloc.Accelerator,
); err != nil {
logger.Error(err, "Failed to emit optimization signals for variantAutoscaling",
"variantName", variantAutoscaling.Name)
// Don't fail the reconciliation for metric emission errors
// Metrics are critical for HPA, but emission failures shouldn't break core functionality
return nil
}
logger.Info("EmitReplicaMetrics completed",
"variantName", variantAutoscaling.Name,
"currentReplicas", currentReplicas,
"desiredReplicas", desiredReplicas,
"accelerator", variantAutoscaling.Status.DesiredOptimizedAlloc.Accelerator)
return nil
}
// EmitSaturationMetrics emits saturation analysis and KV cache capacity metrics from a decision.
func (a *Actuator) EmitSaturationMetrics(ctx context.Context, decision interfaces.VariantDecision) error {
return a.MetricsEmitter.EmitSaturationMetrics(
ctx,
decision.VariantName,
decision.Namespace,
decision.ModelID,
decision.AcceleratorName,
decision.RequiredCapacityUnit,
decision.Utilization,
decision.SpareCapacity,
decision.RequiredCapacity,
decision.KvCacheTokensUsed,
decision.KvCacheTokensCapacity,
)
}
// DeleteSaturationMetricsForVariant removes all saturation metric series for a
// variant. Call this when the current optimization cycle produced no fresh
// decision for the variant, or when the VA is being deleted — so dashboards
// don't show stale values.
func (a *Actuator) DeleteSaturationMetricsForVariant(variantName, namespace string) {
a.MetricsEmitter.DeleteSaturationMetricsForVariant(variantName, namespace)
}