-
Notifications
You must be signed in to change notification settings - Fork 45
Add optimization loop performance metrics #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ import ( | |
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/engines/pipeline" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/logging" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/metrics" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/saturation" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/utils" | ||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/utils/scaletarget" | ||
|
|
@@ -102,6 +103,9 @@ type Engine struct { | |
| // AnalyzerResults. Selected per-cycle based on enableLimiter config: | ||
| // CostAwareOptimizer (unlimited) or GreedyByScoreOptimizer (limited). | ||
| optimizer pipeline.ScalingOptimizer | ||
|
|
||
| // metricsEmitter emits optimization loop performance metrics (duration, models processed). | ||
| metricsEmitter *metrics.MetricsEmitter | ||
|
||
| } | ||
|
|
||
| // NewEngine creates a new instance of the saturation engine. | ||
|
|
@@ -144,6 +148,7 @@ func NewEngine(client client.Client, scheme *runtime.Scheme, recorder record.Eve | |
| queueingModelAnalyzer: queueingmodel.NewQueueingModelAnalyzer(), | ||
| capacityStore: capacityStore, | ||
| optimizer: scalingOptimizer, | ||
| metricsEmitter: metrics.NewMetricsEmitter(), | ||
| } | ||
|
|
||
| engine.executor = executor.NewPollingExecutor(executor.PollingConfig{ | ||
|
|
@@ -181,6 +186,21 @@ func (e *Engine) StartOptimizeLoop(ctx context.Context) { | |
|
|
||
| // optimize performs the optimization logic. | ||
| func (e *Engine) optimize(ctx context.Context) error { | ||
| start := time.Now() | ||
| var optimizeErr error | ||
|
||
| var modelsProcessed int | ||
| defer func() { | ||
| duration := time.Since(start).Seconds() | ||
| status := "success" | ||
| if optimizeErr != nil { | ||
| status = "error" | ||
| } | ||
| e.metricsEmitter.ObserveOptimizationDuration(duration, status) | ||
| if modelsProcessed > 0 { | ||
| e.metricsEmitter.IncrModelsProcessed(modelsProcessed) | ||
| } | ||
| }() | ||
|
|
||
| logger := ctrl.LoggerFrom(ctx) | ||
|
|
||
| // Get optimization interval from Config (already a time.Duration) | ||
|
|
@@ -203,6 +223,7 @@ func (e *Engine) optimize(ctx context.Context) error { | |
| activeVAs, _, err := utils.ActiveVariantAutoscaling(ctx, e.client) | ||
| if err != nil { | ||
| logger.Error(err, "Unable to get active variant autoscalings") | ||
| optimizeErr = err | ||
| return err | ||
| } | ||
|
|
||
|
|
@@ -217,6 +238,7 @@ func (e *Engine) optimize(ctx context.Context) error { | |
| if err != nil { | ||
| logger.Error(err, "Failed to collect cluster inventory") | ||
| // do not proceed to optimization if inventory collection fails in limited mode | ||
| optimizeErr = err | ||
| return err | ||
| } | ||
| // always print inventory until optimizer consumes it | ||
|
|
@@ -304,9 +326,12 @@ func (e *Engine) optimize(ctx context.Context) error { | |
| } | ||
| if err := e.applySaturationDecisions(ctx, allDecisions, vaMap, currentAllocations); err != nil { | ||
| logger.Error(err, "Failed to apply saturation decisions") | ||
| optimizeErr = err | ||
| return err | ||
| } | ||
|
|
||
| modelsProcessed = len(modelGroups) | ||
|
||
|
|
||
| logger.Info("Optimization completed successfully", | ||
| "mode", "saturation-only", | ||
| "modelsProcessed", len(modelGroups), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,9 @@ var ( | |
| currentReplicas *prometheus.GaugeVec | ||
| desiredRatio *prometheus.GaugeVec | ||
|
|
||
| optimizationDuration *prometheus.HistogramVec | ||
| modelsProcessedTotal prometheus.Counter | ||
|
|
||
| // controllerInstance stores the optional controller instance identifier. | ||
| // When set, it's added as a label to all emitted metrics. | ||
| controllerInstance string | ||
|
|
@@ -76,6 +79,21 @@ func InitMetrics(registry prometheus.Registerer) error { | |
| baseLabels, | ||
| ) | ||
|
|
||
| optimizationDuration = prometheus.NewHistogramVec( | ||
| prometheus.HistogramOpts{ | ||
| Name: constants.WVAOptimizationDurationSeconds, | ||
| Help: "Duration of optimization loop cycles in seconds", | ||
| Buckets: []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}, | ||
| }, | ||
| []string{constants.LabelStatus}, | ||
|
||
| ) | ||
| modelsProcessedTotal = prometheus.NewCounter( | ||
| prometheus.CounterOpts{ | ||
| Name: constants.WVAModelsProcessedTotal, | ||
| Help: "Total number of models processed across optimization cycles", | ||
| }, | ||
| ) | ||
|
|
||
| // Register metrics with the registry | ||
| if err := registry.Register(replicaScalingTotal); err != nil { | ||
| return fmt.Errorf("failed to register replicaScalingTotal metric: %w", err) | ||
|
|
@@ -89,6 +107,12 @@ func InitMetrics(registry prometheus.Registerer) error { | |
| if err := registry.Register(desiredRatio); err != nil { | ||
| return fmt.Errorf("failed to register desiredRatio metric: %w", err) | ||
| } | ||
| if err := registry.Register(optimizationDuration); err != nil { | ||
| return fmt.Errorf("failed to register optimizationDuration metric: %w", err) | ||
| } | ||
| if err := registry.Register(modelsProcessedTotal); err != nil { | ||
| return fmt.Errorf("failed to register modelsProcessedTotal metric: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
@@ -133,6 +157,23 @@ func (m *MetricsEmitter) EmitReplicaScalingMetrics(ctx context.Context, va *llmd | |
| return nil | ||
| } | ||
|
|
||
| // ObserveOptimizationDuration records the duration of an optimization cycle with the given status. | ||
| // Status should be one of: "success", "error", "partial". | ||
|
||
| func (m *MetricsEmitter) ObserveOptimizationDuration(durationSeconds float64, status string) { | ||
| if optimizationDuration == nil { | ||
| return | ||
| } | ||
| optimizationDuration.With(prometheus.Labels{constants.LabelStatus: status}).Observe(durationSeconds) | ||
| } | ||
|
|
||
| // IncrModelsProcessed increments the models-processed counter by the given count. | ||
| func (m *MetricsEmitter) IncrModelsProcessed(count int) { | ||
| if modelsProcessedTotal == nil { | ||
| return | ||
| } | ||
| modelsProcessedTotal.Add(float64(count)) | ||
| } | ||
|
|
||
| // EmitReplicaMetrics emits current and desired replica metrics | ||
| func (m *MetricsEmitter) EmitReplicaMetrics(ctx context.Context, va *llmdOptv1alpha1.VariantAutoscaling, current, desired int32, acceleratorType string) error { | ||
| baseLabels := prometheus.Labels{ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| /* | ||
| Copyright 2025 The llm-d Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package metrics | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/constants" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| dto "github.com/prometheus/client_model/go" | ||
| ) | ||
|
|
||
| func TestObserveOptimizationDuration(t *testing.T) { | ||
| registry := prometheus.NewRegistry() | ||
| if err := InitMetrics(registry); err != nil { | ||
| t.Fatalf("InitMetrics failed: %v", err) | ||
| } | ||
| emitter := NewMetricsEmitter() | ||
|
|
||
| // Observe a successful optimization | ||
| emitter.ObserveOptimizationDuration(0.15, "success") | ||
|
|
||
| // Observe a failed optimization | ||
| emitter.ObserveOptimizationDuration(2.5, "error") | ||
|
|
||
| // Verify the histogram was recorded | ||
| metrics, err := registry.Gather() | ||
| if err != nil { | ||
| t.Fatalf("Failed to gather metrics: %v", err) | ||
| } | ||
|
|
||
| var found bool | ||
| for _, mf := range metrics { | ||
| if mf.GetName() == constants.WVAOptimizationDurationSeconds { | ||
| found = true | ||
| // Should have 2 metrics (one per status label) | ||
| if len(mf.GetMetric()) != 2 { | ||
| t.Errorf("Expected 2 metric series, got %d", len(mf.GetMetric())) | ||
| } | ||
| for _, m := range mf.GetMetric() { | ||
| h := m.GetHistogram() | ||
| if h == nil { | ||
| t.Error("Expected histogram metric") | ||
| continue | ||
| } | ||
| if h.GetSampleCount() != 1 { | ||
| t.Errorf("Expected 1 sample per status, got %d", h.GetSampleCount()) | ||
| } | ||
| // Check status label | ||
| status := getLabelValue(m, constants.LabelStatus) | ||
| switch status { | ||
| case "success": | ||
| if h.GetSampleSum() < 0.1 || h.GetSampleSum() > 0.2 { | ||
| t.Errorf("Expected success duration ~0.15, got %f", h.GetSampleSum()) | ||
| } | ||
| case "error": | ||
| if h.GetSampleSum() < 2.0 || h.GetSampleSum() > 3.0 { | ||
| t.Errorf("Expected error duration ~2.5, got %f", h.GetSampleSum()) | ||
| } | ||
| default: | ||
| t.Errorf("Unexpected status label: %s", status) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("Metric %s not found in gathered metrics", constants.WVAOptimizationDurationSeconds) | ||
| } | ||
| } | ||
|
|
||
| func TestIncrModelsProcessed(t *testing.T) { | ||
| registry := prometheus.NewRegistry() | ||
| if err := InitMetrics(registry); err != nil { | ||
| t.Fatalf("InitMetrics failed: %v", err) | ||
| } | ||
| emitter := NewMetricsEmitter() | ||
|
|
||
| // Increment models processed | ||
| emitter.IncrModelsProcessed(3) | ||
| emitter.IncrModelsProcessed(5) | ||
|
|
||
| // Verify the counter | ||
| metrics, err := registry.Gather() | ||
| if err != nil { | ||
| t.Fatalf("Failed to gather metrics: %v", err) | ||
| } | ||
|
|
||
| var found bool | ||
| for _, mf := range metrics { | ||
| if mf.GetName() == constants.WVAModelsProcessedTotal { | ||
| found = true | ||
| if len(mf.GetMetric()) != 1 { | ||
| t.Errorf("Expected 1 metric series, got %d", len(mf.GetMetric())) | ||
| } | ||
| c := mf.GetMetric()[0].GetCounter() | ||
| if c == nil { | ||
| t.Error("Expected counter metric") | ||
| } else if c.GetValue() != 8 { | ||
| t.Errorf("Expected counter value 8 (3+5), got %f", c.GetValue()) | ||
| } | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("Metric %s not found in gathered metrics", constants.WVAModelsProcessedTotal) | ||
| } | ||
| } | ||
|
|
||
| func TestObserveOptimizationDuration_NilSafety(t *testing.T) { | ||
| // Reset the package-level vars to nil to simulate uninitialized state | ||
| savedDuration := optimizationDuration | ||
| savedCounter := modelsProcessedTotal | ||
| optimizationDuration = nil | ||
| modelsProcessedTotal = nil | ||
| defer func() { | ||
| optimizationDuration = savedDuration | ||
| modelsProcessedTotal = savedCounter | ||
| }() | ||
|
|
||
| emitter := NewMetricsEmitter() | ||
|
|
||
| // Should not panic when metrics are not initialized | ||
| emitter.ObserveOptimizationDuration(1.0, "success") | ||
| emitter.IncrModelsProcessed(5) | ||
| } | ||
|
|
||
| // getLabelValue returns the value of a label by name from a metric. | ||
| func getLabelValue(m *dto.Metric, name string) string { | ||
| for _, l := range m.GetLabel() { | ||
| if l.GetName() == name { | ||
| return l.GetValue() | ||
| } | ||
| } | ||
| return "" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A monotonic counter of "total models processed" requires
rate()to be useful, which gives models/sec — not a very meaningful signal for this use case. A gauge ("models in last cycle") would be more directly dashboardable and useful for alerting (e.g., "model count dropped to 0"). Worth considering whether this counter will actually drive alerts or dashboards as-is.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense — changed to a gauge. SetModelsProcessed(n) now reflects the last cycle directly, no rate() needed.