Skip to content

Commit 59f51f0

Browse files
authored
feat: wire V2 saturation analyzer into engine, gated by analyzerName (llm-d#695)
* feat: wire V2 saturation analyzer into engine, gated by analyzerName Integrate the V2 token-based saturation analyzer into the optimization engine behind a config gate (analyzerName: "saturation"). When active, it replaces the V1 percentage-based analyzer inside RunSaturationAnalysis while keeping the rest of the pipeline (enforcer, limiter, decision converter) unchanged via an adapter pattern. Also introduces the CostAwareOptimizer — the first ScalingOptimizer implementation for the V2 pipeline — which handles unlimited-mode multi-variant scaling with cost-based replica allocation. Engine integration: - Add saturationV2Analyzer, capacityStore, and optimizer fields to Engine struct, initialized once in NewEngine() - Gate V2 path in optimize() via analyzerName == "saturation" from global config - optimizeV2() three-stage pipeline: collect ModelScalingRequests, call optimizer.Optimize(), apply enforcer per-model via bridge - Enforcer bridge: extractTargetsFromDecisions, buildVariantAnalysesFromDecisions, applyEnforcedTargetsToDecisions CostAwareOptimizer (unlimited mode): - Scale-up: allocate to most cost-efficient variant (lowest cost/perReplicaCapacity). Variants with pending replicas are NOT skipped — the analyzer already accounts for their capacity in the supply calculation, so RequiredCapacity > 0 means demand exceeds total supply including pending. - Scale-down: remove from most expensive variant (highest absolute cost). The cheapest variant is protected at min 1 replica only when it is the last variant with replicas — this prevents scale-down deadlocks where the expensive variant's per-replica capacity exceeds spare but cheaper replicas could be removed. - Skips variants with zero capacity Limiter infrastructure: - ResourcePool, ResourceConstraints, ConstraintProvider interface for future V2 limited-mode path (GreedyBySaturationOptimizer) - DefaultLimiter implements ConstraintProvider via ComputeConstraints() - TypeInventory.GetResourcePools() for per-type resource availability * fix: use logging level constants instead of raw numeric values Replace V(1) calls with V(logging.DEBUG) in cost_aware_optimizer.go, engine.go, and engine_v2.go for better readability per review feedback. * fix: address PR review feedback on V2 engine integration - Remove unused vcMap parameter from costAwareScaleUp and costAwareScaleDown - Rename V2 analyzer Name() to "saturation-token-based" for clarity (config value "saturation" unchanged) - Select optimizer at engine init time based on LimitedModeEnabled config - Pass optimizerName to applyEnforcedTargetsToDecisions instead of hardcoding "cost-aware" - Add comments explaining V1/V2 path separation rationale and shared metrics registration - Update V1 analyzer doc comment to clarify percentage-based role
1 parent 6563a7f commit 59f51f0

13 files changed

Lines changed: 1514 additions & 69 deletions

internal/engines/analyzers/saturation_v2/analyzer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ func NewSaturationAnalyzer(store *CapacityKnowledgeStore) *SaturationAnalyzer {
3232
}
3333
}
3434

35-
// Name returns the analyzer identifier.
35+
// Name returns the analyzer identifier for logging and result metadata.
36+
// Note: the config value "saturation" (in analyzerName YAML field) selects this analyzer,
37+
// but the descriptive name here is used in AnalyzerResult.AnalyzerName for observability.
3638
func (a *SaturationAnalyzer) Name() string {
37-
return "saturation"
39+
return "saturation-token-based"
3840
}
3941

4042
// EvictStaleHistory removes k2 history entries that have not been updated

internal/engines/analyzers/saturation_v2/analyzer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ var _ = Describe("SaturationAnalyzer", func() {
2323
})
2424

2525
Describe("Name", func() {
26-
It("should return 'saturation'", func() {
27-
Expect(analyzer.Name()).To(Equal("saturation"))
26+
It("should return 'saturation-token-based'", func() {
27+
Expect(analyzer.Name()).To(Equal("saturation-token-based"))
2828
})
2929
})
3030

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
package pipeline
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"sort"
8+
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
11+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/interfaces"
12+
"github.com/llm-d/llm-d-workload-variant-autoscaler/internal/logging"
13+
)
14+
15+
// CostAwareOptimizer is a per-model optimizer that minimizes total cost while
16+
// meeting capacity requirements. It processes each model independently:
17+
//
18+
// - Scale-up: adds replicas to the most cost-efficient variant (lowest cost / perReplicaCapacity)
19+
// - Scale-down: removes replicas from the most expensive variant (highest absolute cost)
20+
// - Only the cheapest variant is protected at >=1 replica; others can scale to 0
21+
// - Variants with pending replicas are skipped for scale-up
22+
//
23+
// This optimizer ignores ResourceConstraints (unlimited mode). For GPU-limited
24+
// environments, use GreedyBySaturationOptimizer instead.
25+
type CostAwareOptimizer struct{}
26+
27+
// NewCostAwareOptimizer creates a new CostAwareOptimizer.
28+
func NewCostAwareOptimizer() *CostAwareOptimizer {
29+
return &CostAwareOptimizer{}
30+
}
31+
32+
// Name returns the optimizer identifier.
33+
func (o *CostAwareOptimizer) Name() string {
34+
return "cost-aware"
35+
}
36+
37+
// Optimize produces VariantDecisions for all models.
38+
// Constraints are ignored in unlimited mode (CostAwareOptimizer).
39+
func (o *CostAwareOptimizer) Optimize(
40+
ctx context.Context,
41+
requests []ModelScalingRequest,
42+
constraints []*ResourceConstraints,
43+
) []interfaces.VariantDecision {
44+
logger := ctrl.LoggerFrom(ctx)
45+
var allDecisions []interfaces.VariantDecision
46+
47+
for _, req := range requests {
48+
if req.Result == nil {
49+
continue
50+
}
51+
52+
stateMap := buildStateMap(req.VariantStates)
53+
vcMap := buildCapacityMap(req.Result.VariantCapacities)
54+
targets := initTargets(req.VariantStates)
55+
56+
if req.Result.RequiredCapacity > 0 {
57+
costAwareScaleUp(ctx, req.Result, targets)
58+
} else if req.Result.SpareCapacity > 0 {
59+
costAwareScaleDown(ctx, req.Result, targets)
60+
}
61+
62+
decisions := buildDecisions(req, stateMap, vcMap, targets)
63+
logger.V(logging.DEBUG).Info("Cost-aware optimizer decisions",
64+
"modelID", req.ModelID,
65+
"decisions", len(decisions))
66+
allDecisions = append(allDecisions, decisions...)
67+
}
68+
69+
return allDecisions
70+
}
71+
72+
// costAwareScaleUp adds replicas to the most cost-efficient variant.
73+
// Sorts by cost-efficiency (cost/perReplicaCapacity) ascending, picks first eligible.
74+
// Pending replicas are not skipped because the analyzer already accounts for their
75+
// capacity in the supply calculation — if RequiredCapacity > 0, demand exceeds total
76+
// supply including pending.
77+
func costAwareScaleUp(
78+
ctx context.Context,
79+
result *interfaces.AnalyzerResult,
80+
targets map[string]int,
81+
) {
82+
logger := ctrl.LoggerFrom(ctx)
83+
84+
sorted := sortByCostEfficiencyAsc(result.VariantCapacities)
85+
remaining := result.RequiredCapacity
86+
87+
for _, vc := range sorted {
88+
if remaining <= 0 {
89+
break
90+
}
91+
if vc.PerReplicaCapacity <= 0 {
92+
continue
93+
}
94+
95+
replicasNeeded := int(math.Ceil(remaining / vc.PerReplicaCapacity))
96+
targets[vc.VariantName] = targets[vc.VariantName] + replicasNeeded
97+
remaining -= float64(replicasNeeded) * vc.PerReplicaCapacity
98+
99+
logger.V(logging.DEBUG).Info("Scale-up allocation",
100+
"variant", vc.VariantName,
101+
"added", replicasNeeded,
102+
"costEfficiency", costEfficiency(vc))
103+
}
104+
}
105+
106+
// costAwareScaleDown removes replicas from the most expensive variant.
107+
// Sorts by absolute cost descending, removes from most expensive first.
108+
// The cheapest variant is protected at min 1 replica only when no other variant
109+
// has replicas — this prevents scale-down deadlocks where the expensive variant's
110+
// per-replica capacity exceeds spare but cheaper replicas could be removed.
111+
func costAwareScaleDown(
112+
ctx context.Context,
113+
result *interfaces.AnalyzerResult,
114+
targets map[string]int,
115+
) {
116+
logger := ctrl.LoggerFrom(ctx)
117+
118+
sorted := sortByCostDesc(result.VariantCapacities)
119+
cheapest := findCheapestVariant(result.VariantCapacities)
120+
remaining := result.SpareCapacity
121+
122+
for _, vc := range sorted {
123+
if remaining <= 0 {
124+
break
125+
}
126+
if vc.PerReplicaCapacity <= 0 {
127+
continue
128+
}
129+
130+
current := targets[vc.VariantName]
131+
minReplicas := 0
132+
if vc.VariantName == cheapest {
133+
// Protect cheapest at 1 only if it's the last variant with replicas
134+
otherHasReplicas := false
135+
for name, t := range targets {
136+
if name != cheapest && t > 0 {
137+
otherHasReplicas = true
138+
break
139+
}
140+
}
141+
if !otherHasReplicas {
142+
minReplicas = 1
143+
}
144+
}
145+
146+
removable := current - minReplicas
147+
if removable <= 0 {
148+
continue
149+
}
150+
151+
replicasToRemove := int(math.Floor(remaining / vc.PerReplicaCapacity))
152+
if replicasToRemove > removable {
153+
replicasToRemove = removable
154+
}
155+
if replicasToRemove <= 0 {
156+
continue
157+
}
158+
159+
targets[vc.VariantName] = current - replicasToRemove
160+
remaining -= float64(replicasToRemove) * vc.PerReplicaCapacity
161+
162+
logger.V(logging.DEBUG).Info("Scale-down allocation",
163+
"variant", vc.VariantName,
164+
"removed", replicasToRemove,
165+
"cost", vc.Cost)
166+
}
167+
}
168+
169+
// buildStateMap creates a lookup map from variant name to VariantReplicaState.
170+
func buildStateMap(states []interfaces.VariantReplicaState) map[string]interfaces.VariantReplicaState {
171+
m := make(map[string]interfaces.VariantReplicaState, len(states))
172+
for _, s := range states {
173+
m[s.VariantName] = s
174+
}
175+
return m
176+
}
177+
178+
// buildCapacityMap creates a lookup map from variant name to VariantCapacity.
179+
func buildCapacityMap(capacities []interfaces.VariantCapacity) map[string]interfaces.VariantCapacity {
180+
m := make(map[string]interfaces.VariantCapacity, len(capacities))
181+
for _, vc := range capacities {
182+
m[vc.VariantName] = vc
183+
}
184+
return m
185+
}
186+
187+
// initTargets creates initial targets from current replica counts.
188+
func initTargets(states []interfaces.VariantReplicaState) map[string]int {
189+
targets := make(map[string]int, len(states))
190+
for _, s := range states {
191+
targets[s.VariantName] = s.CurrentReplicas
192+
}
193+
return targets
194+
}
195+
196+
// findCheapestVariant returns the variant name with the lowest cost.
197+
func findCheapestVariant(capacities []interfaces.VariantCapacity) string {
198+
cheapest := ""
199+
minCost := math.MaxFloat64
200+
for _, vc := range capacities {
201+
if vc.Cost < minCost {
202+
minCost = vc.Cost
203+
cheapest = vc.VariantName
204+
}
205+
}
206+
return cheapest
207+
}
208+
209+
// sortByCostEfficiencyAsc returns variants sorted by cost/perReplicaCapacity ascending.
210+
func sortByCostEfficiencyAsc(capacities []interfaces.VariantCapacity) []interfaces.VariantCapacity {
211+
sorted := make([]interfaces.VariantCapacity, len(capacities))
212+
copy(sorted, capacities)
213+
sort.Slice(sorted, func(i, j int) bool {
214+
return costEfficiency(sorted[i]) < costEfficiency(sorted[j])
215+
})
216+
return sorted
217+
}
218+
219+
// sortByCostDesc returns variants sorted by absolute cost descending.
220+
func sortByCostDesc(capacities []interfaces.VariantCapacity) []interfaces.VariantCapacity {
221+
sorted := make([]interfaces.VariantCapacity, len(capacities))
222+
copy(sorted, capacities)
223+
sort.Slice(sorted, func(i, j int) bool {
224+
return sorted[i].Cost > sorted[j].Cost
225+
})
226+
return sorted
227+
}
228+
229+
// costEfficiency returns the cost per unit of capacity.
230+
func costEfficiency(vc interfaces.VariantCapacity) float64 {
231+
if vc.PerReplicaCapacity <= 0 {
232+
return math.MaxFloat64
233+
}
234+
return vc.Cost / vc.PerReplicaCapacity
235+
}
236+
237+
// buildDecisions converts targets map into VariantDecision slice.
238+
func buildDecisions(
239+
req ModelScalingRequest,
240+
stateMap map[string]interfaces.VariantReplicaState,
241+
vcMap map[string]interfaces.VariantCapacity,
242+
targets map[string]int,
243+
) []interfaces.VariantDecision {
244+
decisions := make([]interfaces.VariantDecision, 0, len(targets))
245+
for name, target := range targets {
246+
state := stateMap[name]
247+
vc := vcMap[name]
248+
249+
var action interfaces.SaturationAction
250+
var reason string
251+
switch {
252+
case target > state.CurrentReplicas:
253+
action = interfaces.ActionScaleUp
254+
reason = fmt.Sprintf("V2 scale-up (optimizer: cost-aware, required: %.0f)", req.Result.RequiredCapacity)
255+
case target < state.CurrentReplicas:
256+
action = interfaces.ActionScaleDown
257+
reason = fmt.Sprintf("V2 scale-down (optimizer: cost-aware, spare: %.0f)", req.Result.SpareCapacity)
258+
default:
259+
action = interfaces.ActionNoChange
260+
reason = "V2 steady state"
261+
}
262+
263+
decisions = append(decisions, interfaces.VariantDecision{
264+
VariantName: name,
265+
ModelID: req.ModelID,
266+
Namespace: req.Namespace,
267+
AcceleratorName: vc.AcceleratorName,
268+
Cost: vc.Cost,
269+
CurrentReplicas: state.CurrentReplicas,
270+
TargetReplicas: target,
271+
Action: action,
272+
Reason: reason,
273+
})
274+
}
275+
return decisions
276+
}
277+
278+
// mergeConstraints combines constraints from multiple providers.
279+
// Currently unused in CostAwareOptimizer but available for limited mode.
280+
func mergeConstraints(constraints []*ResourceConstraints) map[string]int {
281+
merged := make(map[string]int)
282+
for _, c := range constraints {
283+
if c == nil {
284+
continue
285+
}
286+
for accType, pool := range c.Pools {
287+
if existing, ok := merged[accType]; !ok || pool.Available < existing {
288+
merged[accType] = pool.Available
289+
}
290+
}
291+
}
292+
return merged
293+
}
294+
295+
// Ensure CostAwareOptimizer implements ScalingOptimizer
296+
var _ ScalingOptimizer = (*CostAwareOptimizer)(nil)

0 commit comments

Comments
 (0)