Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduling/pd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func NewScheduler(ctx context.Context, schedCfg *config.Config, ds Datastore) (*
scheduler.prefill = scheduling.NewSchedulerWithConfig(
ds,
scheduler.generateSchedulerConfig(ctx, schedCfg.PrefillSchedulerPlugins,
&filter.PrefillFilter{}),
filter.NewPrefillFilter()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like we could have remove the NewPrefillFilter and NewDecodeFilter functions and just use here:

filter.NewRoleBasedFilter("prefill-filter", filter.RolePrefill)

(same for decode).

)

scheduler.decode = scheduling.NewSchedulerWithConfig(
ds,
scheduler.generateSchedulerConfig(ctx, schedCfg.DecodeSchedulerPlugins,
&filter.DecodeFilter{}),
filter.NewDecodeFilter()),
)

return scheduler, nil
Expand Down
57 changes: 29 additions & 28 deletions pkg/scheduling/plugins/filter/pd_role_filter.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this leverage by_label.go as the base?

Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package filter

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
// RoleLabel name
// RoleLabel name of the label that contains the pod's role
RoleLabel = "llm-d.ai/role"
// RolePrefill set for designated prefill workers
RolePrefill = "prefill"
Expand All @@ -16,46 +15,48 @@ const (
RoleBoth = "both"
)

// PrefillFilter - filters out pods that are not marked with role Prefill
type PrefillFilter struct{}
// RoleBasedFilter - filters out pods based on the role defiled by RoleLabel
type RoleBasedFilter struct {
validRoles map[string]struct{}
name string
}

var _ plugins.Filter = &PrefillFilter{} // validate interface conformance
// NewPrefillFilter creates and returns instance of RoleBasedFilter configured for prefill
func NewPrefillFilter() *RoleBasedFilter {
// TODO: doesn't RoleBoth also imply Prefill?
return NewRoleBasedFilter("prefill-filter", RolePrefill)
}

// Name returns the name of the filter
func (pf *PrefillFilter) Name() string {
return "prefill-filter"
// NewDecodeFilter creates and returns instance of RoleBasedFilter configured for decode
func NewDecodeFilter() *RoleBasedFilter {
return NewRoleBasedFilter("decode-filter", RoleDecode, RoleBoth)
}

// Filter filters out all pods that are not marked as "prefill"
func (pf *PrefillFilter) Filter(_ *types.SchedulingContext, pods []types.Pod) []types.Pod {
filteredPods := []types.Pod{}
// NewRoleBasedFilter creates and returns instance of RoleBasedFilter based on input parameters
// name - the filter name
// rolesArr - list of valid roles
func NewRoleBasedFilter(name string, rolesArr ...string) *RoleBasedFilter {
roles := map[string]struct{}{}

for _, pod := range pods {
role := pod.GetPod().Labels[RoleLabel]
if role == RolePrefill { // TODO: doesn't RoleBoth also imply Prefill?
filteredPods = append(filteredPods, pod)
}
for _, role := range rolesArr {
roles[role] = struct{}{}
}
return filteredPods
}

// DecodeFilter - filters out pods that are not marked with role Decode or Both
type DecodeFilter struct{}

var _ plugins.Filter = &DecodeFilter{} // validate interface conformance
return &RoleBasedFilter{name: name, validRoles: roles}
}

// Name returns the name of the filter
func (df *DecodeFilter) Name() string {
return "decode-filter"
func (f *RoleBasedFilter) Name() string {
return f.name
}

// Filter removes all pods that are not marked as "decode" or "both"
func (df *DecodeFilter) Filter(_ *types.SchedulingContext, pods []types.Pod) []types.Pod {
// Filter filters out all pods that are not marked with one of roles from the validRoles collection
func (f *RoleBasedFilter) Filter(_ *types.SchedulingContext, pods []types.Pod) []types.Pod {
filteredPods := []types.Pod{}

for _, pod := range pods {
role, defined := pod.GetPod().Labels[RoleLabel]
if !defined || role == RoleDecode || role == RoleBoth {
role := pod.GetPod().Labels[RoleLabel]
if _, exists := f.validRoles[role]; exists {
filteredPods = append(filteredPods, pod)
}
}
Expand Down