Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/sriov_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type SriovStaticPolicyOptions struct {
}

type SriovDynamicPolicyOptions struct {
PodRequiredAnnotations map[string]string
LargeSizeVFQueueCount int
LargeSizeVFCPUThreshold int
LargeSizeVFFailOnExhaustion bool
Expand All @@ -69,6 +70,7 @@ func NewSriovOptions() *SriovOptions {
MaxBondingVFQueueCount: math.MaxInt32,
},
SriovDynamicPolicyOptions: SriovDynamicPolicyOptions{
PodRequiredAnnotations: map[string]string{},
LargeSizeVFQueueCount: 32,
LargeSizeVFCPUThreshold: 24,
LargeSizeVFFailOnExhaustion: true,
Expand All @@ -90,6 +92,7 @@ func (o *SriovOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.StringToStringVar(&o.ExtraAnnotations, "sriov-vf-extra-annotations", o.ExtraAnnotations, "Extra annotations for sriov vf")
fs.IntVar(&o.MinBondingVFQueueCount, "static-min-bonding-vf-queue-count", o.MinBondingVFQueueCount, "Min queue count of bonding VF can be allocated in static policy")
fs.IntVar(&o.MaxBondingVFQueueCount, "static-max-bonding-vf-queue-count", o.MaxBondingVFQueueCount, "Max queue count of bonding VF can be allocated in static policy")
fs.StringToStringVar(&o.PodRequiredAnnotations, "dynamic-pod-required-annotations", o.PodRequiredAnnotations, "Required annotations for pod to dynamic allocate VF")
fs.IntVar(&o.LargeSizeVFQueueCount, "dynamic-large-size-vf-queue-count", o.LargeSizeVFQueueCount, "Queue count for VF to be identified as large size VF in dynamic policy")
fs.IntVar(&o.LargeSizeVFCPUThreshold, "dynamic-large-size-vf-cpu-threshold", o.LargeSizeVFCPUThreshold, "Threshold of cpu quantity to allocate large size VF in dynamic policy")
fs.BoolVar(&o.LargeSizeVFFailOnExhaustion, "dynamic-large-size-vf-fail-on-exhaustion", o.LargeSizeVFFailOnExhaustion, "Should fail or not when large size VF is exhausted in dynamic policy")
Expand All @@ -109,6 +112,7 @@ func (s *SriovOptions) ApplyTo(config *qrmconfig.SriovQRMPluginConfig) error {
}
config.MinBondingVFQueueCount = s.MinBondingVFQueueCount
config.MaxBondingVFQueueCount = s.MaxBondingVFQueueCount
config.PodRequiredAnnotations = s.PodRequiredAnnotations
config.LargeSizeVFQueueCount = s.LargeSizeVFQueueCount
config.LargeSizeVFCPUThreshold = s.LargeSizeVFCPUThreshold
config.LargeSizeVFFailOnExhaustion = s.LargeSizeVFFailOnExhaustion
Expand Down
66 changes: 53 additions & 13 deletions pkg/agent/qrm-plugins/sriov/policy/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,28 @@ import (
cpudynamicpolicy "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy"
)

type podFilter func(req *pluginapi.ResourceRequest) bool

func requiredAnnotationFilter(required map[string]string) podFilter {
return func(req *pluginapi.ResourceRequest) bool {
for requiredK, requiredV := range required {
if gotV, exists := req.Annotations[requiredK]; !exists || gotV != requiredV {
return false
}
}

return true
}
}

type DynamicPolicy struct {
sync.RWMutex
*basePolicy

name string
started bool
emitter metrics.MetricEmitter
name string
started bool
emitter metrics.MetricEmitter
podFilters []podFilter

policyConfig qrmconfig.SriovDynamicPolicyConfig
}
Expand All @@ -76,10 +91,16 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
return false, agent.ComponentStub{}, nil
}

var podFilters []podFilter
if conf.SriovDynamicPolicyConfig.PodRequiredAnnotations != nil {
podFilters = append(podFilters, requiredAnnotationFilter(conf.SriovDynamicPolicyConfig.PodRequiredAnnotations))
}

dynamicPolicy := &DynamicPolicy{
name: fmt.Sprintf("%s_%s", agentName, consts.SriovResourcePluginPolicyNameDynamic),
emitter: wrappedEmitter,
basePolicy: basePolicy,
podFilters: podFilters,
policyConfig: conf.SriovDynamicPolicyConfig,
}

Expand Down Expand Up @@ -112,8 +133,6 @@ func (p *DynamicPolicy) Run(ctx context.Context) {

<-ctx.Done()
general.Infof("stopped")

return
}

// GetAccompanyResourceTopologyHints get topology hints of accompany resources
Expand Down Expand Up @@ -149,6 +168,11 @@ func (p *DynamicPolicy) GetAccompanyResourceTopologyHints(req *pluginapi.Resourc
return nil
}

if !p.shouldDynamicAllocateForPod(req) {
general.Infof("pod %s/%s filtered out, skip get accompany resource topology hints", req.PodNamespace, req.PodName)
return nil
}

request, _, err := qrmutil.GetPodAggregatedRequestResource(req)
if err != nil {
return fmt.Errorf("GetPodAggregatedRequestResource failed with error: %v", err)
Expand Down Expand Up @@ -234,14 +258,6 @@ func (p *DynamicPolicy) AllocateAccompanyResource(req *pluginapi.ResourceRequest
}
}()

qosLevel, err := qrmutil.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys)
if err != nil {
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
general.Errorf("%s", err.Error())
return err
}

// reuse allocation info allocated by same pod and container
allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName)
if allocationInfo != nil {
Expand All @@ -256,6 +272,20 @@ func (p *DynamicPolicy) AllocateAccompanyResource(req *pluginapi.ResourceRequest
return nil
}

// qrmutil.GetKatalystQoSLevelFromResourceReq would overwrite req.Annotations, so we should run before it
if !p.shouldDynamicAllocateForPod(req) {
general.Infof("pod %s/%s filtered out, skip allocate accompany resource", req.PodNamespace, req.PodName)
return nil
}

qosLevel, err := qrmutil.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys)
if err != nil {
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
general.Errorf("%s", err.Error())
return err
}

// get request quantity of main resource: cpu
request, _, err := qrmutil.GetPodAggregatedRequestResource(req)
if err != nil {
Expand Down Expand Up @@ -346,6 +376,16 @@ func (p *DynamicPolicy) ReleaseAccompanyResource(req *pluginapi.RemovePodRequest
return nil
}

func (p *DynamicPolicy) shouldDynamicAllocateForPod(req *pluginapi.ResourceRequest) bool {
for _, filter := range p.podFilters {
if !filter(req) {
return false
}
}

return true
}

func (p *DynamicPolicy) addAllocationInfoToResponse(allocationInfo *state.AllocationInfo, resp *pluginapi.ResourceAllocationResponse) error {
resourceAllocationInfo, err := p.generateResourceAllocationInfo(allocationInfo)
if err != nil {
Expand Down
182 changes: 155 additions & 27 deletions pkg/agent/qrm-plugins/sriov/policy/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,86 @@ func TestDynamicPolicy_GetAccompanyResourceTopologyHints(t *testing.T) {
So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, "no available VFs")
})

Convey("pod without required annotations", t, func() {
vfState, podEntries := state.GenerateDummyState(2, 2, nil)
policy := generateDynamicPolicy(t, false, true, vfState, podEntries)
policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})}

req := &pluginapi.ResourceRequest{
PodUid: "pod",
PodName: "pod",
ContainerName: "container",
ResourceName: string(v1.ResourceCPU),
ResourceRequests: map[string]float64{
string(v1.ResourceCPU): 32,
},
}

hints := &pluginapi.ListOfTopologyHints{
Hints: []*pluginapi.TopologyHint{
{
Nodes: []uint64{0, 1},
Preferred: true,
},
},
}

err := policy.GetAccompanyResourceTopologyHints(req, hints)
So(err, ShouldBeNil)
So(hints, ShouldResemble, &pluginapi.ListOfTopologyHints{
Hints: []*pluginapi.TopologyHint{
{
Nodes: []uint64{0, 1},
Preferred: true,
},
},
})
})

Convey("pod with reqired annotations but no available VFs with large size", t, func() {
vfState, podEntries := state.GenerateDummyState(2, 2, map[int]sets.Int{
0: sets.NewInt(0, 1),
1: sets.NewInt(0, 1),
})
policy := generateDynamicPolicy(t, false, true, vfState, podEntries)
policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})}

req := &pluginapi.ResourceRequest{
PodUid: "pod",
PodName: "pod",
ContainerName: "container",
Annotations: map[string]string{
"foo": "bar",
},
ResourceName: string(v1.ResourceCPU),
ResourceRequests: map[string]float64{
string(v1.ResourceCPU): 32,
},
}

hints := &pluginapi.ListOfTopologyHints{
Hints: []*pluginapi.TopologyHint{
{
Nodes: []uint64{0, 1},
Preferred: true,
},
},
}

err := policy.GetAccompanyResourceTopologyHints(req, hints)

So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, "no available VFs")
})
}

func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) {
t.Parallel()

Convey("dryRun", t, func() {
vfState, podEntries := state.GenerateDummyState(2, 2, nil)
policy := generateDynamicPolicy(t, false, true, vfState, podEntries)
policy := generateDynamicPolicy(t, true, true, vfState, podEntries)

req := &pluginapi.ResourceRequest{
PodUid: "pod",
Expand Down Expand Up @@ -343,32 +415,6 @@ func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) {
AllocatedQuantity: 32,
AllocationResult: "1-32",
},
policy.ResourceName(): {
IsNodeResource: true,
IsScalarResource: true,
AllocatedQuantity: 1,
Annotations: map[string]string{
netNsAnnotationKey: "/var/run/netns/ns2",
pciAnnotationKey: `[{"address":"0000:40:00.1","repName":"eth0_1","vfName":"eth0_1"}]`,
},
Devices: []*pluginapi.DeviceSpec{
{
HostPath: filepath.Join(rdmaDevicePrefix, "umad1"),
ContainerPath: filepath.Join(rdmaDevicePrefix, "umad1"),
Permissions: "rwm",
},
{
HostPath: filepath.Join(rdmaDevicePrefix, "uverbs1"),
ContainerPath: filepath.Join(rdmaDevicePrefix, "uverbs1"),
Permissions: "rwm",
},
{
HostPath: rdmaCmPath,
ContainerPath: rdmaCmPath,
Permissions: "rw",
},
},
},
},
})
})
Expand Down Expand Up @@ -584,4 +630,86 @@ func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) {
So(err, ShouldNotBeNil)
So(err.Error(), ShouldContainSubstring, "no available VFs")
})

Convey("pod without required annotations", t, func() {
vfState, podEntries := state.GenerateDummyState(2, 2, nil)
policy := generateDynamicPolicy(t, false, true, vfState, podEntries)
policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})}

req := &pluginapi.ResourceRequest{
PodUid: "pod",
PodName: "pod",
ContainerName: "container",
ResourceName: string(v1.ResourceCPU),
Hint: &pluginapi.TopologyHint{
Nodes: []uint64{0, 1},
},
ResourceRequests: map[string]float64{
string(v1.ResourceCPU): 32,
},
}

resp := &pluginapi.ResourceAllocationResponse{
AllocationResult: &pluginapi.ResourceAllocation{
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
string(v1.ResourceCPU): {
AllocatedQuantity: 32,
AllocationResult: "1-32",
},
},
},
}

err := policy.AllocateAccompanyResource(req, resp)
So(err, ShouldBeNil)
So(resp.AllocationResult, ShouldResemble, &pluginapi.ResourceAllocation{
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
string(v1.ResourceCPU): {
AllocatedQuantity: 32,
AllocationResult: "1-32",
},
},
})
})

Convey("pod with required annotations but no available VFs with large size", t, func() {
vfState, podEntries := state.GenerateDummyState(2, 2, map[int]sets.Int{
0: sets.NewInt(0, 1),
1: sets.NewInt(0, 1),
})
policy := generateDynamicPolicy(t, false, true, vfState, podEntries)
policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})}

req := &pluginapi.ResourceRequest{
PodUid: "pod",
PodName: "pod",
ContainerName: "container",
Annotations: map[string]string{
"foo": "bar",
},
ResourceName: string(v1.ResourceCPU),
Hint: &pluginapi.TopologyHint{
Nodes: []uint64{0, 1},
},
ResourceRequests: map[string]float64{
string(v1.ResourceCPU): 32,
},
}

resp := &pluginapi.ResourceAllocationResponse{
AllocationResult: &pluginapi.ResourceAllocation{
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
string(v1.ResourceCPU): {
AllocatedQuantity: 32,
AllocationResult: "1-32",
},
},
},
}

err := policy.AllocateAccompanyResource(req, resp)

So(err, ShouldNotBeNil)
So(err.Error(), ShouldContainSubstring, "no available VFs")
})
}
1 change: 1 addition & 0 deletions pkg/config/agent/qrm/sriov_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type SriovStaticPolicyConfig struct {
}

type SriovDynamicPolicyConfig struct {
PodRequiredAnnotations map[string]string
LargeSizeVFQueueCount int
LargeSizeVFCPUThreshold int
LargeSizeVFFailOnExhaustion bool
Expand Down
Loading