Skip to content

Commit ff1ff87

Browse files
committed
breakdown scorer to modular files
1 parent ab56d4a commit ff1ff87

File tree

9 files changed

+1242
-943
lines changed

9 files changed

+1242
-943
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
© 2025 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License.
11+
*/
12+
13+
// Package requestcontrol contains helpers to decouple latency-predictor logic.
14+
package slo_aware_router
15+
16+
import (
17+
"os"
18+
"strconv"
19+
"strings"
20+
)
21+
22+
var SLOBufferFactor = func() float64 {
23+
if value, exists := os.LookupEnv("SLO_BUFFER_FACTOR"); exists {
24+
if parsedValue, err := strconv.ParseFloat(value, 64); err == nil {
25+
return parsedValue
26+
}
27+
}
28+
return 1.0 // default value
29+
}()
30+
31+
var NegHeadroomTTFTWeight = func() float64 {
32+
if value, exists := os.LookupEnv("NEG_HEADROOM_TTFT_WEIGHT"); exists {
33+
if parsedValue, err := strconv.ParseFloat(value, 64); err == nil && parsedValue >= 0 {
34+
return parsedValue
35+
}
36+
}
37+
return 0.8 // default: TTFT dominates when violating SLOs
38+
}()
39+
40+
var NegHeadroomTPOTWeight = func() float64 {
41+
if value, exists := os.LookupEnv("NEG_HEADROOM_TPOT_WEIGHT"); exists {
42+
if parsedValue, err := strconv.ParseFloat(value, 64); err == nil && parsedValue >= 0 {
43+
return parsedValue
44+
}
45+
}
46+
return 0.2 // default: TPOT less important in your tiny-output scenario
47+
}()
48+
49+
var HeadroomTTFTWeight = func() float64 {
50+
if value, exists := os.LookupEnv("HEADROOM_TTFT_WEIGHT"); exists {
51+
if parsedValue, err := strconv.ParseFloat(value, 64); err == nil && parsedValue >= 0 {
52+
return parsedValue
53+
}
54+
}
55+
return 0.8 // default
56+
}()
57+
58+
var HeadroomTPOTWeight = func() float64 {
59+
if value, exists := os.LookupEnv("HEADROOM_TPOT_WEIGHT"); exists {
60+
if parsedValue, err := strconv.ParseFloat(value, 64); err == nil && parsedValue >= 0 {
61+
return parsedValue
62+
}
63+
}
64+
return 0.2 // default
65+
}()
66+
67+
var HeadroomSelectionStrategy = func() HeadroomStrategy {
68+
if value, exists := os.LookupEnv("HEADROOM_SELECTION_STRATEGY"); exists {
69+
switch strings.ToLower(value) {
70+
case "least":
71+
return HeadroomStrategyLeast
72+
case "most":
73+
return HeadroomStrategyMost
74+
case "composite-least":
75+
return HeadroomStrategyCompositeLeast
76+
case "composite-most":
77+
return HeadroomStrategyCompositeMost
78+
case "composite-only":
79+
return HeadroomStrategyCompositeOnly
80+
}
81+
}
82+
return HeadroomStrategyLeast // default to least (better packing)
83+
}()
84+
85+
// If using composite headroom, weights for each component. Not used by default
86+
var CompositeKVWeight = func() float64 {
87+
if v, ok := os.LookupEnv("COMPOSITE_KV_WEIGHT"); ok {
88+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 {
89+
return f
90+
}
91+
}
92+
return 1
93+
}()
94+
95+
var CompositeQueueWeight = func() float64 {
96+
if v, ok := os.LookupEnv("COMPOSITE_QUEUE_WEIGHT"); ok {
97+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 {
98+
return f
99+
}
100+
}
101+
return 1
102+
}()
103+
104+
var CompositePrefixWeight = func() float64 {
105+
if v, ok := os.LookupEnv("COMPOSITE_PREFIX_WEIGHT"); ok {
106+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 {
107+
return f
108+
}
109+
}
110+
return 1
111+
}()
112+
113+
// With probability ε, explore (ignore affinity gate); otherwise exploit.
114+
var EpsilonExploreSticky = func() float64 {
115+
// Prefer new env; fall back to old for compatibility.
116+
if v, ok := os.LookupEnv("STICKY_EPSILON"); ok {
117+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 {
118+
return f
119+
}
120+
}
121+
return 0.01 // default 1% exploration
122+
}()
123+
124+
var EpsilonExploreNeg = func() float64 {
125+
// Prefer new env; fall back to old for compatibility.
126+
if v, ok := os.LookupEnv("NEG_HEADROOM_EPSILON"); ok {
127+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 {
128+
return f
129+
}
130+
}
131+
return 0.01 // default 1% exploration
132+
}()
133+
134+
// τ for per-path affinity gate (aka "stickiness" threshold).
135+
var AffinityGateTau = func() float64 {
136+
// Prefer new env; fall back to old for compatibility.
137+
if v, ok := os.LookupEnv("AFFINITY_GATE_TAU"); ok {
138+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 {
139+
return f
140+
}
141+
}
142+
return 0.80
143+
}()
144+
145+
// Global τ for the overall candidate set (previously "overall stickiness").
146+
var AffinityGateTauGlobal = func() float64 {
147+
// Prefer new env; fall back to old for compatibility.
148+
if v, ok := os.LookupEnv("AFFINITY_GATE_TAU_GLOBAL"); ok {
149+
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 {
150+
return f
151+
}
152+
}
153+
return 0.99
154+
}()
155+
156+
// Read once at init. Values: "linear" (default) or "max".
157+
var SelectionMode = func() PodSelectionMode {
158+
if v, ok := os.LookupEnv("POD_SELECTION_MODE"); ok {
159+
switch strings.ToLower(v) {
160+
case "max":
161+
return PodSelectionMax
162+
case "linear":
163+
fallthrough
164+
default:
165+
return PodSelectionLinear
166+
}
167+
}
168+
return PodSelectionLinear
169+
}()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
© 2025 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License.
11+
*/
12+
13+
// Package requestcontrol contains helpers to decouple latency-predictor logic.
14+
package slo_aware_router
15+
16+
import (
17+
"fmt"
18+
"strconv"
19+
20+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
21+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
22+
)
23+
24+
// parseFloatHeader retrieves a header by name, parses it as a float64,
25+
// and returns the value or an error if the header is missing or invalid.
26+
func parseFloatHeader(request schedulingtypes.LLMRequest, headerName string) (float64, bool, error) {
27+
// 1. Get header value from the map
28+
headerValue, ok := request.Headers[headerName]
29+
if !ok {
30+
return 0, false, nil // Header not found, return 0 and false
31+
}
32+
33+
// 2. Parse the header value to a float64
34+
parsedFloat, err := strconv.ParseFloat(headerValue, 64)
35+
if err != nil {
36+
return 0, false, errutil.Error{
37+
Code: errutil.BadRequest,
38+
Msg: fmt.Sprintf("%s must be a float", headerName),
39+
}
40+
}
41+
42+
// 3. Return the successfully parsed value
43+
return parsedFloat, true, nil
44+
}
45+
46+
// parseFloatHeader retrieves a header by name, parses it as a bool,
47+
// and returns the value or an error if the header is missing or invalid.
48+
func parseBoolHeader(request schedulingtypes.LLMRequest, headerName string) (bool, error) {
49+
// 1. Get header value from the map
50+
headerValue, ok := request.Headers[headerName]
51+
if !ok {
52+
return false, nil // Header not found, return 0 and false
53+
}
54+
55+
// 2. Parse the header value to a bool
56+
parsedBool, err := strconv.ParseBool(headerValue)
57+
if err != nil {
58+
return false, errutil.Error{
59+
Code: errutil.BadRequest,
60+
Msg: fmt.Sprintf("%s must be a bool", headerName),
61+
}
62+
}
63+
64+
// 3. Return the successfully parsed value
65+
return parsedBool, nil
66+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package slo_aware_router
18+
19+
import (
20+
"context"
21+
"math"
22+
"math/rand"
23+
24+
"sigs.k8s.io/controller-runtime/pkg/log"
25+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
26+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
27+
)
28+
29+
func (s *SLOAwareRouter) selectFromCompositeScores(ctx context.Context, allPreds []PodPredictionResult, r *rand.Rand, strategy HeadroomStrategy) schedulingtypes.Pod {
30+
total := 0
31+
choices := s.buildCompositeChoices(
32+
ctx, allPreds, CompositeKVWeight, CompositeQueueWeight, CompositePrefixWeight, &total,
33+
)
34+
if strategy == HeadroomStrategyCompositeLeast {
35+
// Invert weights for "least" strategy
36+
for i := range choices {
37+
choices[i].Weight = minWeight + Wmax - choices[i].Weight
38+
}
39+
}
40+
selectedPod := s.performWeightedRandomSelection(choices, total, allPreds, r)
41+
return selectedPod
42+
}
43+
func (s *SLOAwareRouter) performWeightedRandomSelection(weightedChoices []Choice, total int, candidates []PodPredictionResult, r *rand.Rand) schedulingtypes.Pod {
44+
if total == 0 {
45+
return nil
46+
}
47+
logger := log.FromContext(context.Background())
48+
// Check if MAX_SCORE_SELECTION env variable is set
49+
if SelectionMode == PodSelectionMax {
50+
51+
logger.V(logutil.DEBUG).Info("Pod selection mode: MAX - selecting pod with highest weight")
52+
maxWeight := 0
53+
var selectedPod schedulingtypes.Pod
54+
for _, c := range weightedChoices {
55+
if c.Weight > maxWeight {
56+
maxWeight = c.Weight
57+
selectedPod = c.PodName
58+
}
59+
}
60+
if selectedPod != nil {
61+
return selectedPod
62+
}
63+
// Fallback to first pod if no selection made
64+
return candidates[0].Pod
65+
}
66+
67+
// Original weighted random selection logic
68+
logger.V(logutil.DEBUG).Info("Pod selection mode: LINEAR - performing weighted random selection")
69+
idx := r.Intn(total)
70+
var selectedPod schedulingtypes.Pod
71+
72+
for _, c := range weightedChoices {
73+
if idx < c.Weight {
74+
selectedPod = c.PodName
75+
break
76+
}
77+
idx -= c.Weight
78+
}
79+
80+
// If no pod was selected (shouldn't happen), fallback to first pod
81+
if selectedPod == nil {
82+
selectedPod = candidates[0].Pod
83+
}
84+
85+
return selectedPod
86+
}
87+
func (s *SLOAwareRouter) buildCompositeChoices(
88+
ctx context.Context,
89+
candidates []PodPredictionResult,
90+
wkv, wq, wpref float64,
91+
total *int,
92+
) []Choice {
93+
94+
// Normalize weights
95+
sumw := wkv + wq + wpref
96+
if sumw <= 0 {
97+
wkv, wq, wpref = 1, 0, 0
98+
} else {
99+
wkv /= sumw
100+
wq /= sumw
101+
wpref /= sumw
102+
}
103+
104+
// Precompute queue stats
105+
minQ, maxQ := math.MaxInt32, -1
106+
queueCounts := make(map[string]int, len(candidates))
107+
for _, p := range candidates {
108+
q := p.Pod.GetMetrics().WaitingQueueSize
109+
queueCounts[p.Pod.GetPod().String()] = q
110+
if q < minQ {
111+
minQ = q
112+
}
113+
if q > maxQ {
114+
maxQ = q
115+
}
116+
}
117+
den := float64(maxQ - minQ)
118+
119+
choices := make([]Choice, 0, len(candidates))
120+
for _, p := range candidates {
121+
q := queueCounts[p.Pod.GetPod().String()]
122+
relQueue := 1.0
123+
if den > 0 {
124+
relQueue = (float64(maxQ-q) / den)
125+
}
126+
127+
kvUsage := p.Pod.GetMetrics().KVCacheUsagePercent
128+
kvFree := (1.0 - kvUsage)
129+
prefix := (p.PrefixCacheScore)
130+
131+
composite := wkv*kvFree + wq*relQueue + wpref*prefix
132+
w := int(math.Round(float64(minWeight) + (float64(Wmax-minWeight) * composite)))
133+
*total += w
134+
choices = append(choices, Choice{PodName: p.Pod, Weight: w})
135+
136+
log.FromContext(ctx).V(logutil.DEBUG).Info("Composite (neg/pos) score",
137+
"pod", p.Pod.GetPod().String(),
138+
"kvUsage", kvUsage, "kvFree", kvFree,
139+
"queue", q, "relQueue", relQueue,
140+
"prefix", prefix,
141+
"wkv", wkv, "wq", wq, "wprefix", wpref,
142+
"composite", composite, "weight", w)
143+
}
144+
return choices
145+
}

0 commit comments

Comments
 (0)