Skip to content

Commit 072ec8b

Browse files
authored
Finalizing v2 saturation engine (#836)
* feat: add priority, roles, multi-analyzer scoring, and GreedyByScore optimizer Implement five interconnected enhancements to the V2 saturation engine: 1. Per-model priority config with "{modelID}#{namespace}" lookup and default fallback via resolveSaturationConfig() 2. P/D role types (prefill/decode/both) on VariantReplicaState, VariantDecision, and VariantCapacity, injected from deployment labels (llm-d.ai/role) 3. Per-role capacity aggregation in the V2 analyzer via aggregateByRole(), with scheduler queue demand split by role: prefill gets inputTokens, decode gets inputTokens+outputTokens 4. Multi-analyzer scoring infrastructure with AnalyzerScoreConfig and composite score = priority * sum(requiredCapacity_i * score_i) 5. Rename GreedyBySaturationOptimizer to GreedyByScoreOptimizer with score-based priority ordering, per-role work units for P/D disaggregated models, and proportional P/D balancing * refactor: demand-proportional P/D distribution in GreedyByScoreOptimizer Replace post-hoc proportionalPDBalance() with demand-proportional allocation integrated into the fair-share algorithm itself. Previously, disaggregated models created separate per-role work units that competed independently in fair-share, then a post-processing step patched up the P:D ratio based on initial replica counts. Now each model enters fair-share as a single entity. When allocating replicas, allocateByRole() distributes them between prefill/decode roles proportional to their per-role RequiredCapacity. If one role can't use GPUs (e.g. accelerator exhausted), subsequent iterations let the other role absorb its share naturally. Removed: proportionalPDBalance, initTargetsForRole, mergeWorkTargets Added: allocateByRole, allocateToVariants, roleDemands on modelWork * fix: prevent role absorption in demand-proportional P/D allocation When one role cannot fully allocate (e.g., accelerator exhausted), consume its unallocated share from remaining so it does not overflow to other roles in subsequent fair-share iterations. * refactor: use Analyzers list for V2 selection with per-analyzer threshold overrides Replace analyzerName-based V2 detection with IsV2() that checks for Analyzers list (new-style) or analyzerName (backward compat). Add per-analyzer ScaleUpThreshold/ScaleDownBoundary overrides on AnalyzerScoreConfig with global fallback via EffectiveScaleUpThreshold/ EffectiveScaleDownBoundary methods. * refactor: convert interfaces tests to Ginkgo Convert saturation_scaling_test.go from standard testing to Ginkgo Describe/It/Expect style with DescribeTable for validation cases. Add suite_test.go for Ginkgo test runner bootstrap. * refactor: unify enforcer to single EnforcePolicyOnDecisions entry point Remove V1-specific EnforcePolicy (map-based) and its helpers (applyScaleToZero, ensureMinimumReplicas). The V1 path now converts targets to decisions first, then uses EnforcePolicyOnDecisions — the same path as V2. This eliminates the dependency on VariantSaturationAnalysis for cost data in the enforcer. * refactor: move SaturationScalingConfig types from interfaces to config package Move SaturationScalingConfig, AnalyzerScoreConfig, and related constants (DefaultPriority, DefaultScaleUpThreshold, DefaultScaleDownBoundary) from internal/interfaces/ to internal/config/ where they belong alongside the rest of the configuration types. Update all imports across 15 files.
1 parent 7f838c4 commit 072ec8b

25 files changed

Lines changed: 1899 additions & 1192 deletions

internal/config/config.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
ctrl "sigs.k8s.io/controller-runtime"
99

10-
interfaces "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
10+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
1111
)
1212

1313
// Config is the unified configuration structure for the WVA controller.
@@ -75,7 +75,7 @@ type featureFlagsConfig struct {
7575

7676
// SaturationScalingConfigPerModel represents saturation scaling configuration
7777
// for all models. Maps model ID (or "default" key) to its configuration.
78-
type SaturationScalingConfigPerModel map[string]interfaces.SaturationScalingConfig
78+
type SaturationScalingConfigPerModel map[string]SaturationScalingConfig
7979

8080
// QMAnalyzerConfigPerModel represents queueing model scaling configuration
8181
// for all models. Maps model ID (or "default" key) to its configuration.
@@ -310,13 +310,13 @@ func (c *Config) ScaleFromZeroMaxConcurrency() int {
310310
// SaturationConfig returns the current global saturation scaling configuration.
311311
// Thread-safe. Returns a copy to prevent external modifications.
312312
// For namespace-aware lookups, use SaturationConfigForNamespace instead.
313-
func (c *Config) SaturationConfig() map[string]interfaces.SaturationScalingConfig {
313+
func (c *Config) SaturationConfig() map[string]SaturationScalingConfig {
314314
return c.SaturationConfigForNamespace("")
315315
}
316316

317317
// resolveSaturationConfig resolves saturation config for a namespace (namespace-local > global).
318318
// Must be called while holding at least a read lock.
319-
func (c *Config) resolveSaturationConfig(namespace string) map[string]interfaces.SaturationScalingConfig {
319+
func (c *Config) resolveSaturationConfig(namespace string) map[string]SaturationScalingConfig {
320320
// Check namespace-local first (if namespace is provided)
321321
if namespace != "" {
322322
if nsConfig, exists := c.saturation.namespaceConfigs[namespace]; exists {
@@ -358,19 +358,19 @@ func (c *Config) resolveScaleToZeroConfig(namespace string) ScaleToZeroConfigDat
358358
// Resolution order: namespace-local > global
359359
// Thread-safe. Returns a copy to prevent external modifications.
360360
// If namespace is empty, returns global config.
361-
func (c *Config) SaturationConfigForNamespace(namespace string) map[string]interfaces.SaturationScalingConfig {
361+
func (c *Config) SaturationConfigForNamespace(namespace string) map[string]SaturationScalingConfig {
362362
c.mu.RLock()
363363
defer c.mu.RUnlock()
364364
sourceConfig := c.resolveSaturationConfig(namespace)
365365
return copySaturationConfig(sourceConfig)
366366
}
367367

368368
// copySaturationConfig creates a deep copy of the saturation config map.
369-
func copySaturationConfig(src map[string]interfaces.SaturationScalingConfig) map[string]interfaces.SaturationScalingConfig {
369+
func copySaturationConfig(src map[string]SaturationScalingConfig) map[string]SaturationScalingConfig {
370370
if src == nil {
371-
return make(map[string]interfaces.SaturationScalingConfig)
371+
return make(map[string]SaturationScalingConfig)
372372
}
373-
result := make(map[string]interfaces.SaturationScalingConfig, len(src))
373+
result := make(map[string]SaturationScalingConfig, len(src))
374374
for k, v := range src {
375375
result[k] = v
376376
}
@@ -410,19 +410,19 @@ func copyScaleToZeroConfig(src ScaleToZeroConfigData) ScaleToZeroConfigData {
410410
// UpdateSaturationConfig updates the global saturation scaling configuration.
411411
// Thread-safe. Takes a copy of the provided map to prevent external modifications.
412412
// For namespace-local updates, use UpdateSaturationConfigForNamespace instead.
413-
func (c *Config) UpdateSaturationConfig(config map[string]interfaces.SaturationScalingConfig) {
413+
func (c *Config) UpdateSaturationConfig(config map[string]SaturationScalingConfig) {
414414
c.UpdateSaturationConfigForNamespace("", config)
415415
}
416416

417417
// UpdateSaturationConfigForNamespace updates the saturation scaling configuration for the given namespace.
418418
// If namespace is empty, updates global config.
419419
// Thread-safe. Takes a copy of the provided map to prevent external modifications.
420-
func (c *Config) UpdateSaturationConfigForNamespace(namespace string, config map[string]interfaces.SaturationScalingConfig) {
420+
func (c *Config) UpdateSaturationConfigForNamespace(namespace string, config map[string]SaturationScalingConfig) {
421421
c.mu.Lock()
422422
defer c.mu.Unlock()
423423

424424
// Make a copy to prevent external modifications
425-
newConfig := make(map[string]interfaces.SaturationScalingConfig, len(config))
425+
newConfig := make(map[string]SaturationScalingConfig, len(config))
426426
maps.Copy(newConfig, config)
427427

428428
var oldCount int

internal/config/config_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/stretchr/testify/assert"
1010
"github.com/stretchr/testify/require"
1111

12-
interfaces "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
12+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
1313
)
1414

1515
// TestConfig_ThreadSafeUpdates tests that concurrent reads and writes to DynamicConfig
@@ -71,8 +71,8 @@ func TestConfig_ThreadSafeUpdates(t *testing.T) {
7171
for j := 0; j < iterations; j++ {
7272

7373
// Update saturation config
74-
newSatConfig := make(map[string]interfaces.SaturationScalingConfig)
75-
newSatConfig["test-accelerator"] = interfaces.SaturationScalingConfig{
74+
newSatConfig := make(map[string]SaturationScalingConfig)
75+
newSatConfig["test-accelerator"] = SaturationScalingConfig{
7676
KvCacheThreshold: 0.8,
7777
QueueLengthThreshold: 5,
7878
KvSpareTrigger: 0.1,
@@ -303,7 +303,7 @@ func TestConfig_NamespaceAwareResolutionPrecedence(t *testing.T) {
303303
cfg := NewTestConfig()
304304

305305
// Set up global saturation config
306-
globalSatConfig := map[string]interfaces.SaturationScalingConfig{
306+
globalSatConfig := map[string]SaturationScalingConfig{
307307
"default": {
308308
KvCacheThreshold: 0.80,
309309
QueueLengthThreshold: 5,
@@ -339,7 +339,7 @@ func TestConfig_NamespaceAwareResolutionPrecedence(t *testing.T) {
339339
// Test 2: Namespace-local config takes precedence
340340
t.Run("Namespace-local config takes precedence", func(t *testing.T) {
341341
// Set namespace-local saturation config
342-
nsSatConfig := map[string]interfaces.SaturationScalingConfig{
342+
nsSatConfig := map[string]SaturationScalingConfig{
343343
"default": {
344344
KvCacheThreshold: 0.70, // Different from global (0.80)
345345
QueueLengthThreshold: 3, // Different from global (5)
@@ -388,7 +388,7 @@ func TestConfig_NamespaceConfigDeletion(t *testing.T) {
388388
cfg := NewTestConfig()
389389

390390
// Set up global saturation config
391-
globalSatConfig := map[string]interfaces.SaturationScalingConfig{
391+
globalSatConfig := map[string]SaturationScalingConfig{
392392
"default": {
393393
KvCacheThreshold: 0.80,
394394
QueueLengthThreshold: 5,
@@ -411,7 +411,7 @@ func TestConfig_NamespaceConfigDeletion(t *testing.T) {
411411
namespace := "test-namespace"
412412

413413
// Set namespace-local config
414-
nsSatConfig := map[string]interfaces.SaturationScalingConfig{
414+
nsSatConfig := map[string]SaturationScalingConfig{
415415
"default": {
416416
KvCacheThreshold: 0.70,
417417
QueueLengthThreshold: 3,
@@ -451,7 +451,7 @@ func TestConfig_MultipleNamespaces(t *testing.T) {
451451
cfg := NewTestConfig()
452452

453453
// Set up global config
454-
globalSatConfig := map[string]interfaces.SaturationScalingConfig{
454+
globalSatConfig := map[string]SaturationScalingConfig{
455455
"default": {
456456
KvCacheThreshold: 0.80,
457457
QueueLengthThreshold: 5,
@@ -463,7 +463,7 @@ func TestConfig_MultipleNamespaces(t *testing.T) {
463463
namespace2 := "namespace2"
464464

465465
// Set namespace1 config
466-
ns1SatConfig := map[string]interfaces.SaturationScalingConfig{
466+
ns1SatConfig := map[string]SaturationScalingConfig{
467467
"default": {
468468
KvCacheThreshold: 0.70,
469469
QueueLengthThreshold: 3,
@@ -472,7 +472,7 @@ func TestConfig_MultipleNamespaces(t *testing.T) {
472472
cfg.UpdateSaturationConfigForNamespace(namespace1, ns1SatConfig)
473473

474474
// Set namespace2 config
475-
ns2SatConfig := map[string]interfaces.SaturationScalingConfig{
475+
ns2SatConfig := map[string]SaturationScalingConfig{
476476
"default": {
477477
KvCacheThreshold: 0.90,
478478
QueueLengthThreshold: 7,

internal/config/loader_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
flag "github.com/spf13/pflag"
1010

11-
interfaces "github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
11+
1212
)
1313

1414
// writeTestConfigFile writes a YAML config file to a temp directory and returns its path.
@@ -256,7 +256,7 @@ func TestConfig_UpdateDynamicConfig(t *testing.T) {
256256
}
257257

258258
// Update saturation config
259-
satConfig := map[string]interfaces.SaturationScalingConfig{
259+
satConfig := map[string]SaturationScalingConfig{
260260
"test": {
261261
KvCacheThreshold: 0.9,
262262
QueueLengthThreshold: 10,
Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
package interfaces
1+
package config
22

33
import "fmt"
44

5+
// DefaultPriority is the default model priority multiplier.
6+
// Higher priority → preferential GPU allocation in fair-share.
7+
const DefaultPriority = 1.0
8+
59
// SaturationScalingConfig holds saturation-based scaling thresholds for a model variant.
610
// Saturation scaling is enabled by default and uses these thresholds to determine when
711
// replicas are saturated and when to scale up.
@@ -44,13 +48,64 @@ type SaturationScalingConfig struct {
4448
// Used by V2 analyzer: spareCapacity = currentSupply - totalDemand / ScaleDownBoundary
4549
// Default: 0.70 (70% utilization allows scale-down)
4650
ScaleDownBoundary float64 `yaml:"scaleDownBoundary,omitempty"`
51+
52+
// Priority is a multiplier for this model's scaling urgency.
53+
// Higher priority → preferential GPU allocation in fair-share.
54+
// Default: 1.0 (neutral).
55+
Priority float64 `yaml:"priority,omitempty"`
56+
57+
// Analyzers configures the set of analyzers and their weights.
58+
// When empty and AnalyzerName is "saturation", defaults to
59+
// [{Name: "saturation", Score: 1.0, Enabled: true}].
60+
Analyzers []AnalyzerScoreConfig `yaml:"analyzers,omitempty"`
61+
}
62+
63+
// AnalyzerScoreConfig configures an individual analyzer's weight in the
64+
// composite scoring function. Per-analyzer threshold overrides are optional;
65+
// when nil, the global top-level thresholds are used.
66+
type AnalyzerScoreConfig struct {
67+
Name string `yaml:"name"`
68+
Enabled *bool `yaml:"enabled,omitempty"` // default true
69+
Score float64 `yaml:"score,omitempty"` // default 1.0
70+
ScaleUpThreshold *float64 `yaml:"scaleUpThreshold,omitempty"` // overrides global
71+
ScaleDownBoundary *float64 `yaml:"scaleDownBoundary,omitempty"` // overrides global
72+
}
73+
74+
// EffectiveScaleUpThreshold returns the per-analyzer threshold if set,
75+
// otherwise falls back to the global value.
76+
func (a *AnalyzerScoreConfig) EffectiveScaleUpThreshold(global float64) float64 {
77+
if a.ScaleUpThreshold != nil {
78+
return *a.ScaleUpThreshold
79+
}
80+
return global
81+
}
82+
83+
// EffectiveScaleDownBoundary returns the per-analyzer boundary if set,
84+
// otherwise falls back to the global value.
85+
func (a *AnalyzerScoreConfig) EffectiveScaleDownBoundary(global float64) float64 {
86+
if a.ScaleDownBoundary != nil {
87+
return *a.ScaleDownBoundary
88+
}
89+
return global
4790
}
4891

4992
// GetAnalyzerName implements the AnalyzerConfig interface.
93+
// Returns "saturation" if Analyzers list is populated (new-style config),
94+
// otherwise returns the raw AnalyzerName field (backward compat).
5095
func (c *SaturationScalingConfig) GetAnalyzerName() string {
96+
if len(c.Analyzers) > 0 {
97+
return "saturation"
98+
}
5199
return c.AnalyzerName
52100
}
53101

102+
// IsV2 returns true if this config selects the V2 token-based analyzer path.
103+
// V2 is active when either the Analyzers list is populated (new-style) or
104+
// AnalyzerName is "saturation" (old-style, backward compat).
105+
func (c *SaturationScalingConfig) IsV2() bool {
106+
return len(c.Analyzers) > 0 || c.AnalyzerName == "saturation"
107+
}
108+
54109
// V2 analyzer default thresholds, applied when fields are omitted from YAML config.
55110
const (
56111
DefaultScaleUpThreshold = 0.85
@@ -60,13 +115,33 @@ const (
60115
// ApplyDefaults fills in zero-valued V2 fields with their defaults.
61116
// Must be called before Validate() to handle omitempty zero-values correctly.
62117
func (c *SaturationScalingConfig) ApplyDefaults() {
63-
if c.AnalyzerName == "saturation" {
118+
if c.Priority == 0 {
119+
c.Priority = DefaultPriority
120+
}
121+
if c.IsV2() {
64122
if c.ScaleUpThreshold == 0 {
65123
c.ScaleUpThreshold = DefaultScaleUpThreshold
66124
}
67125
if c.ScaleDownBoundary == 0 {
68126
c.ScaleDownBoundary = DefaultScaleDownBoundary
69127
}
128+
// Default analyzers list when empty (backward compat for analyzerName: "saturation")
129+
if len(c.Analyzers) == 0 {
130+
enabled := true
131+
c.Analyzers = []AnalyzerScoreConfig{
132+
{Name: "saturation", Score: 1.0, Enabled: &enabled},
133+
}
134+
}
135+
// Apply per-entry defaults
136+
for i := range c.Analyzers {
137+
if c.Analyzers[i].Score == 0 {
138+
c.Analyzers[i].Score = 1.0
139+
}
140+
if c.Analyzers[i].Enabled == nil {
141+
enabled := true
142+
c.Analyzers[i].Enabled = &enabled
143+
}
144+
}
70145
}
71146
}
72147

@@ -86,14 +161,18 @@ func (c *SaturationScalingConfig) Validate() error {
86161
if c.QueueSpareTrigger < 0 {
87162
return fmt.Errorf("queueSpareTrigger must be >= 0, got %.1f", c.QueueSpareTrigger)
88163
}
164+
if c.Priority < 0 {
165+
return fmt.Errorf("priority must be >= 0, got %.2f", c.Priority)
166+
}
167+
89168
// KV cache threshold should be greater than spare trigger (otherwise contradictory)
90169
if c.KvCacheThreshold < c.KvSpareTrigger {
91170
return fmt.Errorf("kvCacheThreshold (%.2f) should be >= kvSpareTrigger (%.2f)",
92171
c.KvCacheThreshold, c.KvSpareTrigger)
93172
}
94173

95-
// V2 analyzer threshold validation
96-
if c.AnalyzerName == "saturation" {
174+
// V2 analyzer threshold validation (global defaults)
175+
if c.IsV2() {
97176
if c.ScaleUpThreshold <= 0 || c.ScaleUpThreshold > 1 {
98177
return fmt.Errorf("scaleUpThreshold must be in (0, 1], got %.2f", c.ScaleUpThreshold)
99178
}
@@ -103,6 +182,24 @@ func (c *SaturationScalingConfig) Validate() error {
103182
if c.ScaleUpThreshold <= c.ScaleDownBoundary {
104183
return fmt.Errorf("scaleUpThreshold (%.2f) must be > scaleDownBoundary (%.2f)", c.ScaleUpThreshold, c.ScaleDownBoundary)
105184
}
185+
// Per-analyzer threshold overrides
186+
for _, a := range c.Analyzers {
187+
if a.ScaleUpThreshold != nil {
188+
if *a.ScaleUpThreshold <= 0 || *a.ScaleUpThreshold > 1 {
189+
return fmt.Errorf("analyzer %q: scaleUpThreshold must be in (0, 1], got %.2f", a.Name, *a.ScaleUpThreshold)
190+
}
191+
}
192+
if a.ScaleDownBoundary != nil {
193+
if *a.ScaleDownBoundary <= 0 || *a.ScaleDownBoundary > 1 {
194+
return fmt.Errorf("analyzer %q: scaleDownBoundary must be in (0, 1], got %.2f", a.Name, *a.ScaleDownBoundary)
195+
}
196+
}
197+
up := a.EffectiveScaleUpThreshold(c.ScaleUpThreshold)
198+
down := a.EffectiveScaleDownBoundary(c.ScaleDownBoundary)
199+
if up <= down {
200+
return fmt.Errorf("analyzer %q: scaleUpThreshold (%.2f) must be > scaleDownBoundary (%.2f)", a.Name, up, down)
201+
}
202+
}
106203
}
107204

108205
return nil

0 commit comments

Comments
 (0)