-
Notifications
You must be signed in to change notification settings - Fork 154
Add program aware plugin #707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
265ffb2
0cedc28
f1737e5
f83bd15
bdc356f
3e4cfee
c58f125
ab4d182
ffabe70
3894397
5e8f3e1
42b042d
87f5232
dc11a5f
9ec35d0
6b202e2
4c80b9d
a3e1bd4
606a5b7
68bfa81
f7d2665
635d400
d04a68a
14e4586
ddefd82
7076818
3b44141
811b647
1993d34
2058888
63b36ab
9392bfa
01bf665
9b4ca5a
edb5acf
cd8a492
6445ddf
9a17d36
8e0fdf6
0996e01
64f15fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| apiVersion: inference.networking.x-k8s.io/v1alpha1 | ||
| kind: EndpointPickerConfig | ||
| plugins: | ||
| - type: program-aware-fairness | ||
| - type: queue-scorer | ||
| - type: max-score-picker | ||
| - type: single-profile-handler | ||
|
|
||
| featureGates: | ||
| - flowControl | ||
| - prepareDataPlugins | ||
|
|
||
| flowControl: | ||
| defaultPriorityBand: | ||
| fairnessPolicyRef: program-aware-fairness | ||
|
|
||
| schedulingProfiles: | ||
| - name: default | ||
| plugins: | ||
| - pluginRef: queue-scorer | ||
| - pluginRef: max-score-picker |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,196 @@ | ||
| // Package programaware implements a flow-control fairness policy that schedules | ||
| // programs using their accumulated metrics using scoring strategies (EWMA or DRR). | ||
| package programaware | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "math" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin" | ||
| requestcontrol "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol" | ||
| ) | ||
|
|
||
| const ( | ||
| // ProgramAwarePluginType is the registered type name for this plugin. | ||
| ProgramAwarePluginType = "program-aware-fairness" | ||
|
|
||
| // fairnessIDHeader is the standard header used to identify the program. | ||
| fairnessIDHeader = "x-gateway-inference-fairness-id" | ||
| ) | ||
|
|
||
| // Config holds the JSON-decoded configuration for the plugin. | ||
| type Config struct { | ||
| // Strategy selects the fairness scoring algorithm used by Pick(). | ||
| // Valid values: "ewma" (default), "drr". | ||
| // | ||
| // "ewma" — head-of-queue age + EWMA historical wait + dispatch-count penalty. | ||
| // Practical heuristic; strong starvation prevention. | ||
| // | ||
| // "drr" — Deficit Round Robin adapted for tokens [Shreedhar & Varghese 1995]. | ||
| // Each round every active queue earns a token quantum; actual token | ||
| // usage is deducted at response completion. Provides provably | ||
| // proportional fairness independent of request rate or size. | ||
| Strategy string `json:"strategy"` | ||
| } | ||
|
|
||
| // Compile-time interface assertions. | ||
| var ( | ||
| _ flowcontrol.FairnessPolicy = &ProgramAwarePlugin{} | ||
| _ requestcontrol.PrepareDataPlugin = &ProgramAwarePlugin{} | ||
| _ requestcontrol.PreRequest = &ProgramAwarePlugin{} | ||
| _ requestcontrol.ResponseComplete = &ProgramAwarePlugin{} | ||
| ) | ||
|
|
||
| // ProgramAwarePluginFactory creates a new ProgramAwarePlugin from JSON config. | ||
| // Example config: {"strategy": "drr"} | ||
| // | ||
| //nolint:revive | ||
| func ProgramAwarePluginFactory(name string, rawCfg json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) { | ||
| cfg := Config{Strategy: "ewma"} | ||
| if len(rawCfg) > 0 { | ||
| if err := json.Unmarshal(rawCfg, &cfg); err != nil { | ||
| return nil, fmt.Errorf("invalid config for %s plugin %q: %w", ProgramAwarePluginType, name, err) | ||
| } | ||
| } | ||
| strategy, err := newStrategy(cfg.Strategy) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("%s plugin %q: %w", ProgramAwarePluginType, name, err) | ||
| } | ||
| return &ProgramAwarePlugin{ | ||
| name: name, | ||
| strategy: strategy, | ||
| }, nil | ||
| } | ||
|
|
||
| // ProgramAwarePlugin implements a FairnessPolicy that selects which program's | ||
| // queue to service next, and request lifecycle hooks that track per-program metrics. | ||
| // | ||
| // Fairness behaviour is determined by the configured ScoringStrategy (default: EWMA). | ||
| // Program identity comes from the x-gateway-inference-fairness-id request header. | ||
| // | ||
| //nolint:revive | ||
| type ProgramAwarePlugin struct { | ||
| name string | ||
| strategy ScoringStrategy | ||
|
|
||
| // programMetrics stores aggregated metrics per program. | ||
| // Key: program ID (string), Value: *ProgramMetrics. | ||
| programMetrics sync.Map | ||
|
|
||
| // requestTimestamps tracks when Pick() dispatched each request, | ||
| // used to compute flow-control queue wait time in PreRequest. | ||
| // Key: request ID (string), Value: time.Time. | ||
| requestTimestamps sync.Map | ||
| } | ||
|
|
||
| // TypedName returns the plugin type and instance name. | ||
| func (p *ProgramAwarePlugin) TypedName() plugin.TypedName { | ||
| return plugin.TypedName{ | ||
| Type: ProgramAwarePluginType, | ||
| Name: p.name, | ||
| } | ||
| } | ||
|
|
||
| // getStrategy returns the configured strategy, falling back to EWMA for zero-value | ||
| // plugin instances constructed directly in tests. | ||
| func (p *ProgramAwarePlugin) getStrategy() ScoringStrategy { | ||
| if p.strategy == nil { | ||
| return &EWMAStrategy{} | ||
| } | ||
| return p.strategy | ||
| } | ||
|
|
||
| // --- FairnessPolicy interface --- | ||
|
|
||
| // NewState creates per-PriorityBand state. This plugin uses its own sync.Map | ||
| // for all state, so no per-band state is needed. | ||
| func (p *ProgramAwarePlugin) NewState(_ context.Context) any { | ||
| return nil | ||
| } | ||
|
|
||
| // Pick selects which program queue to service next. | ||
| // | ||
| // For each queue in the band, the configured ScoringStrategy is given a chance | ||
| // to update its per-program state (OnPickStart), then the queue with the highest | ||
| // score is selected for dispatch. | ||
| func (p *ProgramAwarePlugin) Pick(_ context.Context, band flowcontrol.PriorityBandAccessor) (flowcontrol.FlowQueueAccessor, error) { | ||
| start := time.Now() | ||
| defer func() { | ||
| pickLatencyUs.Observe(float64(time.Since(start).Microseconds())) | ||
| }() | ||
|
|
||
| if band == nil { | ||
| return nil, nil //nolint:nilnil | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: nil band is not an error? this means the caller even when the err == nil must nil check the return is nil. also won't this give inaccurate latency metrics for work that was not done? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We return (nil, nil) instead of (nil, err) because this is how the contract for fairness policy has been defined. The existing Fairness plugins also follow the same convention for example, round-robin. From the behaviour point of view, both cases have same behaviour, that is no item will be dispatched from that band. But in error case the error will be logged. |
||
|
|
||
| var bestQueue flowcontrol.FlowQueueAccessor | ||
| bestScore := -1.0 | ||
| strategy := p.getStrategy() | ||
|
|
||
| band.IterateQueues(func(queue flowcontrol.FlowQueueAccessor) (keepIterating bool) { | ||
| if queue == nil { | ||
| return true | ||
| } | ||
|
|
||
| queueLen := queue.Len() | ||
| metrics := p.getOrCreateMetrics(queue.FlowKey().ID) | ||
|
|
||
| // Strategy hook: runs for every queue, including empty ones. | ||
| // DRR: allocates quantum for active queues, resets deficit for idle queues. | ||
| // EWMA: no-op. | ||
| strategy.OnPickStart(queue.FlowKey().ID, queueLen, metrics) | ||
|
|
||
| if queueLen == 0 { | ||
| return true | ||
| } | ||
|
|
||
| score := p.scoreQueue(queue) | ||
| if score > bestScore { | ||
| bestScore = score | ||
| bestQueue = queue | ||
| } | ||
| return true | ||
| }) | ||
|
|
||
| // Record the selected item's enqueue time so PreRequest can compute | ||
| // the actual flow-control queue wait time (enqueue → dispatch). | ||
| if bestQueue != nil { | ||
| if head := bestQueue.PeekHead(); head != nil { | ||
| p.requestTimestamps.Store(head.OriginalRequest().ID(), head.EnqueueTime()) | ||
| } | ||
| } | ||
|
|
||
| return bestQueue, nil | ||
| } | ||
|
|
||
| // scoreQueue delegates to the configured ScoringStrategy. | ||
| func (p *ProgramAwarePlugin) scoreQueue(queue flowcontrol.FlowQueueAccessor) float64 { | ||
| var metrics *ProgramMetrics | ||
| if metricsRaw, ok := p.programMetrics.Load(queue.FlowKey().ID); ok { | ||
| metrics = metricsRaw.(*ProgramMetrics) | ||
| } | ||
| return p.getStrategy().ScoreQueue(queue, metrics) | ||
| } | ||
|
|
||
| // getOrCreateMetrics returns the ProgramMetrics for the given program ID, creating if needed. | ||
| func (p *ProgramAwarePlugin) getOrCreateMetrics(programID string) *ProgramMetrics { | ||
| if metricsRaw, ok := p.programMetrics.Load(programID); ok { | ||
| return metricsRaw.(*ProgramMetrics) | ||
| } | ||
| m := &ProgramMetrics{} | ||
| actual, _ := p.programMetrics.LoadOrStore(programID, m) | ||
| return actual.(*ProgramMetrics) | ||
| } | ||
|
|
||
| // normalize clamps v/cap to [0, 1]. | ||
| func normalize(v, cap float64) float64 { | ||
| if cap <= 0 { | ||
| return 0 | ||
| } | ||
| return math.Min(math.Max(v/cap, 0), 1) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: while it's true you have no consumers of the ctx in this method. should we perform this work if ctx.Err()?