Skip to content

Commit f5dcf0c

Browse files
committed
removing DefaultPredictionRequestBuilder
1 parent 90830b6 commit f5dcf0c

File tree

5 files changed

+68
-100
lines changed

5 files changed

+68
-100
lines changed

pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/latencypredictor_helper.go

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,27 @@ func recordTTFTTrainingData(
174174
prefixCacheScore float64,
175175
) {
176176
logger := log.FromContext(ctx)
177-
// Build training entry using the builder
178-
entry := requestBuilder.BuildTrainingEntry(
179-
ctx,
180-
pod,
181-
m,
182-
predictedLatencyCtx.schedulingRequest.Body.Completions.Prompt,
183-
predictedLatencyCtx.ttft,
184-
0, // TTFT training
185-
now,
186-
0,
187-
prefixCacheScore,
188-
)
177+
178+
// Build training entry inline (default monolithic behavior)
179+
prompt := predictedLatencyCtx.schedulingRequest.Body.Completions.Prompt
180+
entry := latencypredictor.TrainingEntry{
181+
KVCachePercentage: m.KVCacheUsagePercent,
182+
InputTokenLength: len(strings.Fields(prompt)),
183+
ActualTTFT: predictedLatencyCtx.ttft,
184+
ActualTPOT: 0, // TTFT training
185+
Timestamp: now,
186+
NumRequestWaiting: m.WaitingQueueSize,
187+
NumRequestRunning: m.RunningRequestsSize,
188+
NumTokensGenerated: 0,
189+
PrefixCacheScore: prefixCacheScore,
190+
PodType: "", // Empty for monolithic deployments
191+
}
192+
193+
// Allow customization if builder is provided (for disaggregated deployments)
194+
if requestBuilder != nil {
195+
entry = requestBuilder.BuildTrainingEntry(ctx, pod, m, prompt, predictedLatencyCtx.ttft, 0, now, 0, prefixCacheScore)
196+
}
197+
189198
if err := predictor.AddTrainingDataBulk([]latencypredictor.TrainingEntry{entry}); err != nil {
190199
logger.V(logutil.DEBUG).Error(err, "record TTFT training failed")
191200
}
@@ -264,18 +273,27 @@ func processTokenForLatencyPrediction(
264273
"error", err)
265274
return
266275
}
267-
// Record actual TPOT using builder
268-
entry := requestBuilder.BuildTrainingEntry(
269-
ctx,
270-
pod,
271-
m,
272-
predictedLatencyCtx.schedulingRequest.Body.Completions.Prompt,
273-
0, // TTFT not recorded for TPOT
274-
latencyMs,
275-
now,
276-
predictedLatencyCtx.generatedTokenCount-1,
277-
0, // TPOT does not use prefix cache score
278-
)
276+
277+
// Build training entry inline (default monolithic behavior)
278+
prompt := predictedLatencyCtx.schedulingRequest.Body.Completions.Prompt
279+
entry := latencypredictor.TrainingEntry{
280+
KVCachePercentage: m.KVCacheUsagePercent,
281+
InputTokenLength: len(strings.Fields(prompt)),
282+
ActualTTFT: 0, // TTFT not recorded for TPOT
283+
ActualTPOT: latencyMs,
284+
Timestamp: now,
285+
NumRequestWaiting: m.WaitingQueueSize,
286+
NumRequestRunning: m.RunningRequestsSize,
287+
NumTokensGenerated: predictedLatencyCtx.generatedTokenCount - 1,
288+
PrefixCacheScore: 0, // TPOT does not use prefix cache score
289+
PodType: "", // Empty for monolithic deployments
290+
}
291+
292+
// Allow customization if builder is provided (for disaggregated deployments)
293+
if requestBuilder != nil {
294+
entry = requestBuilder.BuildTrainingEntry(ctx, pod, m, prompt, 0, latencyMs, now, predictedLatencyCtx.generatedTokenCount-1, 0)
295+
}
296+
279297
if err := predictor.AddTrainingDataBulk([]latencypredictor.TrainingEntry{entry}); err != nil {
280298
logger.V(logutil.DEBUG).Error(err, "record TPOT training failed")
281299
}
@@ -344,17 +362,25 @@ func bulkPredictWithMetrics(
344362
}
345363
}
346364

347-
// Build bulk prediction requests using the builder
365+
// Build bulk prediction requests inline (default monolithic behavior)
348366
bulkRequests := make([]latencypredictor.PredictionRequest, len(metricsStates))
349367
for i := range metricsStates {
350-
bulkRequests[i] = requestBuilder.BuildPredictionRequest(
351-
ctx,
352-
pods[i],
353-
metricsStates[i],
354-
prompts[i],
355-
generatedTokenCounts[i],
356-
prefixCacheScores[i],
357-
)
368+
req := latencypredictor.PredictionRequest{
369+
KVCachePercentage: metricsStates[i].KVCacheUsagePercent,
370+
InputTokenLength: len(strings.Fields(prompts[i])),
371+
NumRequestWaiting: metricsStates[i].WaitingQueueSize,
372+
NumRequestRunning: metricsStates[i].RunningRequestsSize,
373+
NumTokensGenerated: generatedTokenCounts[i],
374+
PrefixCacheScore: prefixCacheScores[i],
375+
PodType: "", // Empty for monolithic deployments
376+
}
377+
378+
// Allow customization if builder is provided (for disaggregated deployments)
379+
if requestBuilder != nil {
380+
req = requestBuilder.BuildPredictionRequest(ctx, pods[i], metricsStates[i], prompts[i], generatedTokenCounts[i], prefixCacheScores[i])
381+
}
382+
383+
bulkRequests[i] = req
358384
}
359385

360386
// Perform bulk prediction

pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/latencypredictor_helper_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestBulkPredictWithMetrics(t *testing.T) {
4141
{KVCacheUsagePercent: 0.5},
4242
{KVCacheUsagePercent: 0.6},
4343
}
44-
requestBuilder := &DefaultPredictionRequestBuilder{}
44+
requestBuilder := PredictionRequestBuilder(nil) // nil = default monolithic behavior
4545
pods := []schedulingtypes.Endpoint{
4646
fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
4747
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
@@ -72,7 +72,7 @@ func TestBulkPredictWithMetrics_Error(t *testing.T) {
7272
metricsStates := []*fwkdl.Metrics{
7373
{KVCacheUsagePercent: 0.5},
7474
}
75-
requestBuilder := &DefaultPredictionRequestBuilder{}
75+
requestBuilder := PredictionRequestBuilder(nil) // nil = default monolithic behavior
7676
pods := []schedulingtypes.Endpoint{
7777
fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
7878
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
@@ -91,7 +91,7 @@ func TestBulkPredictWithMetrics_Error(t *testing.T) {
9191
func TestBulkPredictWithMetrics_InputMismatch(t *testing.T) {
9292
mockPredictor := &mockPredictor{}
9393
metricsStates := []*fwkdl.Metrics{{}}
94-
requestBuilder := &DefaultPredictionRequestBuilder{}
94+
requestBuilder := PredictionRequestBuilder(nil) // nil = default monolithic behavior
9595
pods := []schedulingtypes.Endpoint{
9696
fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
9797
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
@@ -111,7 +111,7 @@ func TestBulkPredictWithMetrics_InputMismatch(t *testing.T) {
111111
func TestBulkPredictWithMetrics_NilMetricsState(t *testing.T) {
112112
mockPredictor := &mockPredictor{}
113113
metricsStates := []*fwkdl.Metrics{nil} // Nil metrics state
114-
requestBuilder := &DefaultPredictionRequestBuilder{}
114+
requestBuilder := PredictionRequestBuilder(nil) // nil = default monolithic behavior
115115
pods := []schedulingtypes.Endpoint{
116116
fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
117117
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},

pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/requestcontrol_hooks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func createTestRouter() *PredictedLatency {
6262
sloContextStore: sync.Map{},
6363
runningRequestLists: make(map[types.NamespacedName]*requestPriorityQueue),
6464
latencypredictor: nil,
65-
requestBuilder: &DefaultPredictionRequestBuilder{},
65+
requestBuilder: nil, // nil = default monolithic behavior
6666
config: DefaultConfig,
6767
}
6868
}

pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/scorer.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type Config struct {
6969

7070
// RequestBuilder allows customization of prediction and training request construction.
7171
// This field is not serialized and must be set programmatically.
72-
// If nil, defaults to DefaultPredictionRequestBuilder.
72+
// If nil, uses default monolithic behavior (PodType="").
7373
RequestBuilder PredictionRequestBuilder `json:"-"`
7474
}
7575

@@ -101,10 +101,8 @@ func PredictedLatencyFactory(name string, rawParameters json.RawMessage, handle
101101
}
102102
}
103103

104-
// Use provided builder or default to DefaultPredictionRequestBuilder
105-
if parameters.RequestBuilder == nil {
106-
parameters.RequestBuilder = &DefaultPredictionRequestBuilder{}
107-
}
104+
// RequestBuilder is optional - nil means use default monolithic behavior
105+
// Downstream projects can provide a custom builder for disaggregated serving
108106

109107
if err := parameters.validate(); err != nil {
110108
return nil, fmt.Errorf("invalid PredictedLatency config: %w", err)
@@ -168,16 +166,10 @@ func NewPredictedLatency(config Config, predictor latencypredictor.PredictorInte
168166
strategy = headroomStrategyLeast
169167
}
170168

171-
// Ensure requestBuilder is set
172-
requestBuilder := config.RequestBuilder
173-
if requestBuilder == nil {
174-
requestBuilder = &DefaultPredictionRequestBuilder{}
175-
}
176-
177169
return &PredictedLatency{
178170
typedName: plugin.TypedName{Type: PredictedLatencyPluginType, Name: PredictedLatencyPluginType},
179171
latencypredictor: predictor,
180-
requestBuilder: requestBuilder,
172+
requestBuilder: config.RequestBuilder, // nil = default monolithic behavior
181173
runningRequestLists: make(map[types.NamespacedName]*requestPriorityQueue),
182174
sloContextStore: sync.Map{},
183175
headroomStrategy: strategy,

pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/types.go

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package predictedlatency
1919

2020
import (
2121
"context"
22-
"strings"
2322
"time"
2423

2524
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
@@ -92,52 +91,3 @@ type PredictionRequestBuilder interface {
9291
) latencypredictor.TrainingEntry
9392
}
9493

95-
// DefaultPredictionRequestBuilder provides the default monolithic behavior for building prediction requests.
96-
// This implementation leaves PodType empty, suitable for monolithic (non-disaggregated) deployments.
97-
type DefaultPredictionRequestBuilder struct{}
98-
99-
// BuildPredictionRequest constructs a standard prediction request without pod type information
100-
func (b *DefaultPredictionRequestBuilder) BuildPredictionRequest(
101-
ctx context.Context,
102-
pod schedulingtypes.Endpoint,
103-
metrics *datalayer.Metrics,
104-
prompt string,
105-
generatedTokens int,
106-
prefixCacheScore float64,
107-
) latencypredictor.PredictionRequest {
108-
return latencypredictor.PredictionRequest{
109-
KVCachePercentage: metrics.KVCacheUsagePercent,
110-
InputTokenLength: len(strings.Fields(prompt)), // Simple word-based tokenization
111-
NumRequestWaiting: metrics.WaitingQueueSize,
112-
NumRequestRunning: metrics.RunningRequestsSize,
113-
NumTokensGenerated: generatedTokens,
114-
PrefixCacheScore: prefixCacheScore,
115-
PodType: "", // Empty for monolithic deployments
116-
}
117-
}
118-
119-
// BuildTrainingEntry constructs a standard training entry without pod type information
120-
func (b *DefaultPredictionRequestBuilder) BuildTrainingEntry(
121-
ctx context.Context,
122-
pod schedulingtypes.Endpoint,
123-
metrics *datalayer.Metrics,
124-
prompt string,
125-
actualTTFT float64,
126-
actualTPOT float64,
127-
timestamp time.Time,
128-
generatedTokens int,
129-
prefixCacheScore float64,
130-
) latencypredictor.TrainingEntry {
131-
return latencypredictor.TrainingEntry{
132-
KVCachePercentage: metrics.KVCacheUsagePercent,
133-
InputTokenLength: len(strings.Fields(prompt)), // Simple word-based tokenization
134-
ActualTTFT: actualTTFT,
135-
ActualTPOT: actualTPOT,
136-
Timestamp: timestamp,
137-
NumRequestWaiting: metrics.WaitingQueueSize,
138-
NumRequestRunning: metrics.RunningRequestsSize,
139-
NumTokensGenerated: generatedTokens,
140-
PrefixCacheScore: prefixCacheScore,
141-
PodType: "", // Empty for monolithic deployments
142-
}
143-
}

0 commit comments

Comments
 (0)