diff --git a/cmd/katalyst-agent/app/options/qrm/sriov_plugin.go b/cmd/katalyst-agent/app/options/qrm/sriov_plugin.go index c38ca3c557..84e16e59ea 100644 --- a/cmd/katalyst-agent/app/options/qrm/sriov_plugin.go +++ b/cmd/katalyst-agent/app/options/qrm/sriov_plugin.go @@ -46,6 +46,7 @@ type SriovStaticPolicyOptions struct { } type SriovDynamicPolicyOptions struct { + PodRequiredAnnotations map[string]string LargeSizeVFQueueCount int LargeSizeVFCPUThreshold int LargeSizeVFFailOnExhaustion bool @@ -69,6 +70,7 @@ func NewSriovOptions() *SriovOptions { MaxBondingVFQueueCount: math.MaxInt32, }, SriovDynamicPolicyOptions: SriovDynamicPolicyOptions{ + PodRequiredAnnotations: map[string]string{}, LargeSizeVFQueueCount: 32, LargeSizeVFCPUThreshold: 24, LargeSizeVFFailOnExhaustion: true, @@ -90,6 +92,7 @@ func (o *SriovOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringToStringVar(&o.ExtraAnnotations, "sriov-vf-extra-annotations", o.ExtraAnnotations, "Extra annotations for sriov vf") fs.IntVar(&o.MinBondingVFQueueCount, "static-min-bonding-vf-queue-count", o.MinBondingVFQueueCount, "Min queue count of bonding VF can be allocated in static policy") fs.IntVar(&o.MaxBondingVFQueueCount, "static-max-bonding-vf-queue-count", o.MaxBondingVFQueueCount, "Max queue count of bonding VF can be allocated in static policy") + fs.StringToStringVar(&o.PodRequiredAnnotations, "dynamic-pod-required-annotations", o.PodRequiredAnnotations, "Required annotations for pod to dynamic allocate VF") fs.IntVar(&o.LargeSizeVFQueueCount, "dynamic-large-size-vf-queue-count", o.LargeSizeVFQueueCount, "Queue count for VF to be identified as large size VF in dynamic policy") fs.IntVar(&o.LargeSizeVFCPUThreshold, "dynamic-large-size-vf-cpu-threshold", o.LargeSizeVFCPUThreshold, "Threshold of cpu quantity to allocate large size VF in dynamic policy") fs.BoolVar(&o.LargeSizeVFFailOnExhaustion, "dynamic-large-size-vf-fail-on-exhaustion", o.LargeSizeVFFailOnExhaustion, "Should fail or not when large size VF is exhausted in dynamic policy") @@ -109,6 +112,7 @@ func (s *SriovOptions) ApplyTo(config *qrmconfig.SriovQRMPluginConfig) error { } config.MinBondingVFQueueCount = s.MinBondingVFQueueCount config.MaxBondingVFQueueCount = s.MaxBondingVFQueueCount + config.PodRequiredAnnotations = s.PodRequiredAnnotations config.LargeSizeVFQueueCount = s.LargeSizeVFQueueCount config.LargeSizeVFCPUThreshold = s.LargeSizeVFCPUThreshold config.LargeSizeVFFailOnExhaustion = s.LargeSizeVFFailOnExhaustion diff --git a/pkg/agent/qrm-plugins/sriov/policy/dynamic.go b/pkg/agent/qrm-plugins/sriov/policy/dynamic.go index 64f6b11230..fa522e6b14 100644 --- a/pkg/agent/qrm-plugins/sriov/policy/dynamic.go +++ b/pkg/agent/qrm-plugins/sriov/policy/dynamic.go @@ -44,13 +44,28 @@ import ( cpudynamicpolicy "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy" ) +type podFilter func(req *pluginapi.ResourceRequest) bool + +func requiredAnnotationFilter(required map[string]string) podFilter { + return func(req *pluginapi.ResourceRequest) bool { + for requiredK, requiredV := range required { + if gotV, exists := req.Annotations[requiredK]; !exists || gotV != requiredV { + return false + } + } + + return true + } +} + type DynamicPolicy struct { sync.RWMutex *basePolicy - name string - started bool - emitter metrics.MetricEmitter + name string + started bool + emitter metrics.MetricEmitter + podFilters []podFilter policyConfig qrmconfig.SriovDynamicPolicyConfig } @@ -76,10 +91,16 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration return false, agent.ComponentStub{}, nil } + var podFilters []podFilter + if conf.SriovDynamicPolicyConfig.PodRequiredAnnotations != nil { + podFilters = append(podFilters, requiredAnnotationFilter(conf.SriovDynamicPolicyConfig.PodRequiredAnnotations)) + } + dynamicPolicy := &DynamicPolicy{ name: fmt.Sprintf("%s_%s", agentName, consts.SriovResourcePluginPolicyNameDynamic), emitter: wrappedEmitter, basePolicy: basePolicy, + podFilters: podFilters, policyConfig: conf.SriovDynamicPolicyConfig, } @@ -112,8 +133,6 @@ func (p *DynamicPolicy) Run(ctx context.Context) { <-ctx.Done() general.Infof("stopped") - - return } // GetAccompanyResourceTopologyHints get topology hints of accompany resources @@ -149,6 +168,11 @@ func (p *DynamicPolicy) GetAccompanyResourceTopologyHints(req *pluginapi.Resourc return nil } + if !p.shouldDynamicAllocateForPod(req) { + general.Infof("pod %s/%s filtered out, skip get accompany resource topology hints", req.PodNamespace, req.PodName) + return nil + } + request, _, err := qrmutil.GetPodAggregatedRequestResource(req) if err != nil { return fmt.Errorf("GetPodAggregatedRequestResource failed with error: %v", err) @@ -234,14 +258,6 @@ func (p *DynamicPolicy) AllocateAccompanyResource(req *pluginapi.ResourceRequest } }() - qosLevel, err := qrmutil.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) - if err != nil { - err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", - req.PodNamespace, req.PodName, req.ContainerName, err) - general.Errorf("%s", err.Error()) - return err - } - // reuse allocation info allocated by same pod and container allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName) if allocationInfo != nil { @@ -256,6 +272,20 @@ func (p *DynamicPolicy) AllocateAccompanyResource(req *pluginapi.ResourceRequest return nil } + // qrmutil.GetKatalystQoSLevelFromResourceReq would overwrite req.Annotations, so we should run before it + if !p.shouldDynamicAllocateForPod(req) { + general.Infof("pod %s/%s filtered out, skip allocate accompany resource", req.PodNamespace, req.PodName) + return nil + } + + qosLevel, err := qrmutil.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req, p.podAnnotationKeptKeys, p.podLabelKeptKeys) + if err != nil { + err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, err) + general.Errorf("%s", err.Error()) + return err + } + // get request quantity of main resource: cpu request, _, err := qrmutil.GetPodAggregatedRequestResource(req) if err != nil { @@ -346,6 +376,16 @@ func (p *DynamicPolicy) ReleaseAccompanyResource(req *pluginapi.RemovePodRequest return nil } +func (p *DynamicPolicy) shouldDynamicAllocateForPod(req *pluginapi.ResourceRequest) bool { + for _, filter := range p.podFilters { + if !filter(req) { + return false + } + } + + return true +} + func (p *DynamicPolicy) addAllocationInfoToResponse(allocationInfo *state.AllocationInfo, resp *pluginapi.ResourceAllocationResponse) error { resourceAllocationInfo, err := p.generateResourceAllocationInfo(allocationInfo) if err != nil { diff --git a/pkg/agent/qrm-plugins/sriov/policy/dynamic_test.go b/pkg/agent/qrm-plugins/sriov/policy/dynamic_test.go index c05d15b130..8be962c150 100644 --- a/pkg/agent/qrm-plugins/sriov/policy/dynamic_test.go +++ b/pkg/agent/qrm-plugins/sriov/policy/dynamic_test.go @@ -302,6 +302,78 @@ func TestDynamicPolicy_GetAccompanyResourceTopologyHints(t *testing.T) { So(err, ShouldNotBeNil) So(err.Error(), ShouldEqual, "no available VFs") }) + + Convey("pod without required annotations", t, func() { + vfState, podEntries := state.GenerateDummyState(2, 2, nil) + policy := generateDynamicPolicy(t, false, true, vfState, podEntries) + policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})} + + req := &pluginapi.ResourceRequest{ + PodUid: "pod", + PodName: "pod", + ContainerName: "container", + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 32, + }, + } + + hints := &pluginapi.ListOfTopologyHints{ + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{0, 1}, + Preferred: true, + }, + }, + } + + err := policy.GetAccompanyResourceTopologyHints(req, hints) + So(err, ShouldBeNil) + So(hints, ShouldResemble, &pluginapi.ListOfTopologyHints{ + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{0, 1}, + Preferred: true, + }, + }, + }) + }) + + Convey("pod with reqired annotations but no available VFs with large size", t, func() { + vfState, podEntries := state.GenerateDummyState(2, 2, map[int]sets.Int{ + 0: sets.NewInt(0, 1), + 1: sets.NewInt(0, 1), + }) + policy := generateDynamicPolicy(t, false, true, vfState, podEntries) + policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})} + + req := &pluginapi.ResourceRequest{ + PodUid: "pod", + PodName: "pod", + ContainerName: "container", + Annotations: map[string]string{ + "foo": "bar", + }, + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 32, + }, + } + + hints := &pluginapi.ListOfTopologyHints{ + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{0, 1}, + Preferred: true, + }, + }, + } + + err := policy.GetAccompanyResourceTopologyHints(req, hints) + + So(err, ShouldNotBeNil) + So(err.Error(), ShouldEqual, "no available VFs") + }) } func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) { @@ -309,7 +381,7 @@ func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) { Convey("dryRun", t, func() { vfState, podEntries := state.GenerateDummyState(2, 2, nil) - policy := generateDynamicPolicy(t, false, true, vfState, podEntries) + policy := generateDynamicPolicy(t, true, true, vfState, podEntries) req := &pluginapi.ResourceRequest{ PodUid: "pod", @@ -343,32 +415,6 @@ func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) { AllocatedQuantity: 32, AllocationResult: "1-32", }, - policy.ResourceName(): { - IsNodeResource: true, - IsScalarResource: true, - AllocatedQuantity: 1, - Annotations: map[string]string{ - netNsAnnotationKey: "/var/run/netns/ns2", - pciAnnotationKey: `[{"address":"0000:40:00.1","repName":"eth0_1","vfName":"eth0_1"}]`, - }, - Devices: []*pluginapi.DeviceSpec{ - { - HostPath: filepath.Join(rdmaDevicePrefix, "umad1"), - ContainerPath: filepath.Join(rdmaDevicePrefix, "umad1"), - Permissions: "rwm", - }, - { - HostPath: filepath.Join(rdmaDevicePrefix, "uverbs1"), - ContainerPath: filepath.Join(rdmaDevicePrefix, "uverbs1"), - Permissions: "rwm", - }, - { - HostPath: rdmaCmPath, - ContainerPath: rdmaCmPath, - Permissions: "rw", - }, - }, - }, }, }) }) @@ -584,4 +630,86 @@ func TestDynamicPolicy_AllocateAccompanyResource(t *testing.T) { So(err, ShouldNotBeNil) So(err.Error(), ShouldContainSubstring, "no available VFs") }) + + Convey("pod without required annotations", t, func() { + vfState, podEntries := state.GenerateDummyState(2, 2, nil) + policy := generateDynamicPolicy(t, false, true, vfState, podEntries) + policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})} + + req := &pluginapi.ResourceRequest{ + PodUid: "pod", + PodName: "pod", + ContainerName: "container", + ResourceName: string(v1.ResourceCPU), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + }, + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 32, + }, + } + + resp := &pluginapi.ResourceAllocationResponse{ + AllocationResult: &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + string(v1.ResourceCPU): { + AllocatedQuantity: 32, + AllocationResult: "1-32", + }, + }, + }, + } + + err := policy.AllocateAccompanyResource(req, resp) + So(err, ShouldBeNil) + So(resp.AllocationResult, ShouldResemble, &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + string(v1.ResourceCPU): { + AllocatedQuantity: 32, + AllocationResult: "1-32", + }, + }, + }) + }) + + Convey("pod with required annotations but no available VFs with large size", t, func() { + vfState, podEntries := state.GenerateDummyState(2, 2, map[int]sets.Int{ + 0: sets.NewInt(0, 1), + 1: sets.NewInt(0, 1), + }) + policy := generateDynamicPolicy(t, false, true, vfState, podEntries) + policy.podFilters = []podFilter{requiredAnnotationFilter(map[string]string{"foo": "bar"})} + + req := &pluginapi.ResourceRequest{ + PodUid: "pod", + PodName: "pod", + ContainerName: "container", + Annotations: map[string]string{ + "foo": "bar", + }, + ResourceName: string(v1.ResourceCPU), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + }, + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 32, + }, + } + + resp := &pluginapi.ResourceAllocationResponse{ + AllocationResult: &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + string(v1.ResourceCPU): { + AllocatedQuantity: 32, + AllocationResult: "1-32", + }, + }, + }, + } + + err := policy.AllocateAccompanyResource(req, resp) + + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "no available VFs") + }) } diff --git a/pkg/config/agent/qrm/sriov_plugin.go b/pkg/config/agent/qrm/sriov_plugin.go index f413769e7b..65c6431162 100644 --- a/pkg/config/agent/qrm/sriov_plugin.go +++ b/pkg/config/agent/qrm/sriov_plugin.go @@ -41,6 +41,7 @@ type SriovStaticPolicyConfig struct { } type SriovDynamicPolicyConfig struct { + PodRequiredAnnotations map[string]string LargeSizeVFQueueCount int LargeSizeVFCPUThreshold int LargeSizeVFFailOnExhaustion bool