Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
DecisionTypeDecodeOnly = "decode-only"
// DecisionTypePrefillDecode is for requests that are gone through P/D.
DecisionTypePrefillDecode = "prefill-decode"
// DecisionTypeEncodeDecode is for requests that are gone through E/PD.
DecisionTypeEncodeDecode = "encode-decode"
)

var (
Expand Down
40 changes: 40 additions & 0 deletions pkg/plugins/filter/epd_role.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package filter

import (
"encoding/json"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

const (
// RoleEncode set for designated encode workers (first stage in pipeline)
RoleEncode = "encode"

// RoleEncodePrefill set for workers that can handle encode+prefill (EP/D disaggregation)
RoleEncodePrefill = "encode-prefill"

// RoleEncodeDecode set for workers that can handle encode+decode (E/PD disaggregation - rare)
RoleEncodeDecode = "encode-decode"
// RoleAll set for workers that can handle all stages (encode, prefill, and decode)
RoleAll = "all"

// EncodeRoleType is the type of the EncodeFilter
EncodeRoleType = "encode-filter"
)

// EncodeRoleFactory defines the factory function for the Encode filter.
func EncodeRoleFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
return NewEncodeRole().WithName(name), nil
}

// NewEncodeRole creates and returns an instance of the Filter configured for encode role.
// Encode is the first stage in the pipeline: Encode → Prefill → Decode
// Accepts pods with roles: encode, encode-prefill, encode-decode, or all.
func NewEncodeRole() *ByLabel {
return NewByLabel(EncodeRoleType, RoleLabel, false,
RoleEncode,
RoleEncodePrefill,
RoleEncodeDecode,
RoleAll,
)
}
6 changes: 4 additions & 2 deletions pkg/plugins/filter/pd_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
DecodeRoleType = "decode-filter"
// PrefillRoleType is the type of the PrefillFilter
PrefillRoleType = "prefill-filter"
// RolePrefillDecode set for workers that can handle prefill and decode stages
RolePrefillDecode = "prefill-decode"
)

// PrefillRoleFactory defines the factory function for the Prefill filter.
Expand All @@ -29,7 +31,7 @@ func PrefillRoleFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin

// NewPrefillRole creates and returns an instance of the Filter configured for prefill role
func NewPrefillRole() *ByLabel {
return NewByLabel(PrefillRoleType, RoleLabel, false, RolePrefill)
return NewByLabel(PrefillRoleType, RoleLabel, false, RolePrefill, RolePrefillDecode, RoleBoth)
}

// DecodeRoleFactory defines the factory function for the Decode filter.
Expand All @@ -39,5 +41,5 @@ func DecodeRoleFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.

// NewDecodeRole creates and returns an instance of the Filter configured for decode role
func NewDecodeRole() *ByLabel {
return NewByLabel(DecodeRoleType, RoleLabel, true, RoleDecode, RoleBoth)
return NewByLabel(DecodeRoleType, RoleLabel, true, RoleDecode, RolePrefillDecode, RoleBoth)
}
92 changes: 92 additions & 0 deletions pkg/plugins/pre-request/ed_prerequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package prerequest

import (
"context"
"encoding/json"
"fmt"
"net"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"

"github.com/llm-d/llm-d-inference-scheduler/pkg/common"
)

const (
// EncodeHeaderHandlerType is the type of the EncodeHeaderHandler
EncodeHeaderHandlerType = "encoder-header-handler"

defaultEncodeProfile = "encode"
)

type encodeHeaderHandlerParameters struct {
EncodeProfile string `json:"encodeProfile"`
}

// compile-time type assertion
var _ requestcontrol.PreRequest = &EncodeHeaderHandler{}

// EncodeHeaderHandlerFactory defines the factory function for the EncodeHeaderHandler
func EncodeHeaderHandlerFactory(name string, rawParameters json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
parameters := encodeHeaderHandlerParameters{
EncodeProfile: defaultEncodeProfile,
}
if rawParameters != nil {
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the '%s' pre-request plugin - %w", EncodeHeaderHandlerType, err)
}
}
return NewEncodeHeaderHandler(parameters.EncodeProfile).WithName(name), nil
}

// NewEncodeHeaderHandler initializes a new EncodeHeaderHandler and returns its pointer.
func NewEncodeHeaderHandler(encodeProfile string) *EncodeHeaderHandler {
return &EncodeHeaderHandler{
typedName: plugin.TypedName{Type: EncodeHeaderHandlerType},
encodeProfile: encodeProfile,
}
}

// EncodeHeaderHandler PreRequest plugin
type EncodeHeaderHandler struct {
typedName plugin.TypedName
encodeProfile string
}

// TypedName returns the typed name of the plugin.
func (p *EncodeHeaderHandler) TypedName() plugin.TypedName {
return p.typedName
}

// WithName sets the name of the plugin.
func (p *EncodeHeaderHandler) WithName(name string) *EncodeHeaderHandler {
p.typedName.Name = name
return p
}

// PreRequest wires encode SchedulerProfile result into a header to indicate encode worker
func (p *EncodeHeaderHandler) PreRequest(ctx context.Context, request *scheduling.LLMRequest, schedulingResult *scheduling.SchedulingResult) {
if _, found := request.Headers[common.EncoderHostsPortsHeader]; found {
request.Headers[common.EncoderHostsPortsHeader] = "" // clear header, if already set
}

encodeProfileRunResult, exists := schedulingResult.ProfileResults[p.encodeProfile]
if !exists {
return // encode profile failed to run or we chose not to run it, no-op in this case
}

// Collect all target endpoints as comma-separated host:port pairs
var encodeHostPorts []string
for _, endpoint := range encodeProfileRunResult.TargetEndpoints {
targetPod := endpoint.GetMetadata()
encodeHostPort := net.JoinHostPort(targetPod.Address, targetPod.Port)
encodeHostPorts = append(encodeHostPorts, encodeHostPort)
}

// Join all host:port pairs with commas
if len(encodeHostPorts) > 0 {
request.Headers[common.EncoderHostsPortsHeader] = strings.Join(encodeHostPorts, ",")
}
}
1 change: 1 addition & 0 deletions pkg/plugins/profile/always_disagg_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (d *AlwaysDisaggPDDecider) TypedName() plugin.TypedName {
// WithName sets the name of the plugin.
func (d *AlwaysDisaggPDDecider) WithName(name string) *AlwaysDisaggPDDecider {
d.typedName.Name = name
d.typedName.Type = AlwaysDisaggDeciderPluginType
return d
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/plugins/profile/always_encode_decider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package profile

import (
"context"
"encoding/json"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
)

const (
// AlwaysEncodeDeciderPluginType is the type-name of the AlwaysEncodeDecider plugin.
AlwaysEncodeDeciderPluginType = "always-encode-decider"
)

// compile-time type assertion
var _ epdDeciderPlugin = &AlwaysEncodeDecider{}

// AlwaysEncodeDecider is an EP decider plugin which always decides to encode.
type AlwaysEncodeDecider struct {
typedName plugin.TypedName
}

// AlwaysEncodeDeciderPluginFactory defines the factory function for creating
// a new instance of the AlwaysEncodeDecider.
func AlwaysEncodeDeciderPluginFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
return newAlwaysEncodeDecider().WithName(name), nil
}

func newAlwaysEncodeDecider() *AlwaysEncodeDecider {
return &AlwaysEncodeDecider{}
}

// TypedName returns the typed name of the plugin.
func (d *AlwaysEncodeDecider) TypedName() plugin.TypedName {
return d.typedName
}

// WithName sets the name of the plugin.
func (d *AlwaysEncodeDecider) WithName(name string) *AlwaysEncodeDecider {
d.typedName.Name = name
d.typedName.Type = AlwaysEncodeDeciderPluginType
return d
}

func (d *AlwaysEncodeDecider) disaggregateEncode(ctx context.Context, _ *scheduling.LLMRequest, _ scheduling.Endpoint) bool {
return true
}
Loading