Skip to content

Commit acfe811

Browse files
committed
feat(cpu/hintoptimizer): add resource package hint optimizer
Implement a new hint optimizer that considers resource package allocations when providing topology hints. The optimizer filters NUMA nodes based on available resources in the specified package, ensuring pods only get hints for nodes with sufficient capacity.
1 parent 846e4d4 commit acfe811

5 files changed

Lines changed: 845 additions & 0 deletions

File tree

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/apimachinery/pkg/util/sets"
24+
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
25+
26+
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer"
27+
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy"
28+
hintoptimizerutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/util"
29+
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
30+
"github.com/kubewharf/katalyst-core/pkg/config"
31+
"github.com/kubewharf/katalyst-core/pkg/metaserver"
32+
"github.com/kubewharf/katalyst-core/pkg/metrics"
33+
"github.com/kubewharf/katalyst-core/pkg/util/general"
34+
"github.com/kubewharf/katalyst-core/pkg/util/native"
35+
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
36+
resourcepackage "github.com/kubewharf/katalyst-core/pkg/util/resource-package"
37+
)
38+
39+
const HintOptimizerNameResourcePackage = "resource_package"
40+
41+
// resourcePackageHintOptimizer implements the HintOptimizer interface based on resource package information.
42+
type resourcePackageHintOptimizer struct {
43+
conf *config.Configuration
44+
metaServer *metaserver.MetaServer
45+
emitter metrics.MetricEmitter
46+
state state.State
47+
}
48+
49+
// NewResourcePackageHintOptimizer creates a new resourcePackageHintOptimizer.
50+
func NewResourcePackageHintOptimizer(
51+
options policy.HintOptimizerFactoryOptions,
52+
) (hintoptimizer.HintOptimizer, error) {
53+
return &resourcePackageHintOptimizer{
54+
conf: options.Conf,
55+
metaServer: options.MetaServer,
56+
emitter: options.Emitter,
57+
state: options.State,
58+
}, nil
59+
}
60+
61+
// OptimizeHints optimizes the topology hints based on resource package information.
62+
func (o *resourcePackageHintOptimizer) OptimizeHints(
63+
request hintoptimizer.Request,
64+
hints *pluginapi.ListOfTopologyHints,
65+
) error {
66+
err := hintoptimizerutil.GenericOptimizeHintsCheck(request, hints)
67+
if err != nil {
68+
general.Errorf("GenericOptimizeHintsCheck failed with error: %v", err)
69+
return err
70+
}
71+
72+
if qosutil.AnnotationsIndicateNUMAExclusive(request.Annotations) {
73+
general.Infof("skip resourcePackageHintOptimizer for exclusive numa pod: %s/%s, container: %s",
74+
request.PodNamespace, request.PodName, request.ContainerName)
75+
return hintoptimizerutil.ErrHintOptimizerSkip
76+
}
77+
78+
resourcePackage := resourcepackage.GetResourcePackageName(request.ResourceRequest.Annotations)
79+
if resourcePackage == "" {
80+
general.Errorf("skip resourcePackageHintOptimizer for pod resource package not found in annotation")
81+
return hintoptimizerutil.ErrHintOptimizerSkip
82+
}
83+
84+
resourcePackageAllocatableMap, err := o.getResourcePackageAllocatable(resourcePackage)
85+
if err != nil {
86+
general.Errorf("getResourcePackageAllocatable failed with error: %v", err)
87+
return err
88+
}
89+
90+
resourcePackageAllocatedMap, err := o.getResourcePackageAllocated(resourcePackage)
91+
if err != nil {
92+
general.Errorf("getResourcePackageAllocated failed with error: %v", err)
93+
return err
94+
}
95+
96+
// Optimize hints based on resource package information
97+
err = o.populateHintsByResourcePackage(hints, request.CPURequest, resourcePackageAllocatableMap, resourcePackageAllocatedMap)
98+
if err != nil {
99+
general.Errorf("populateHintsByResourcePackage failed with error: %v", err)
100+
return err
101+
}
102+
103+
return nil
104+
}
105+
106+
// Run starts the resource package hint optimizer.
107+
func (o *resourcePackageHintOptimizer) Run(stopCh <-chan struct{}) {
108+
// Resource package hint optimizer doesn't need to run background tasks
109+
<-stopCh
110+
}
111+
112+
// populateHintsByResourcePackage optimizes hints based on resource package information.
113+
func (o *resourcePackageHintOptimizer) populateHintsByResourcePackage(
114+
hints *pluginapi.ListOfTopologyHints,
115+
cpuRequest float64,
116+
resourcePackageAllocatableMap map[int]float64,
117+
resourcePackageAllocatedMap map[int]float64,
118+
) error {
119+
canAllocateNodes := sets.NewInt()
120+
for nodeID, allocatable := range resourcePackageAllocatableMap {
121+
if allocatable-resourcePackageAllocatedMap[nodeID] >= cpuRequest {
122+
canAllocateNodes.Insert(nodeID)
123+
}
124+
}
125+
126+
optimizedHints := make([]*pluginapi.TopologyHint, 0, len(hints.Hints))
127+
for _, hint := range hints.Hints {
128+
if len(hint.Nodes) != 1 {
129+
continue
130+
}
131+
132+
if canAllocateNodes.Has(int(hint.Nodes[0])) {
133+
optimizedHints = append(optimizedHints, hint)
134+
}
135+
}
136+
137+
hints.Hints = optimizedHints
138+
return nil
139+
}
140+
141+
func (o *resourcePackageHintOptimizer) getResourcePackageAllocatable(resourcePackage string) (map[int]float64, error) {
142+
// Get resource package information from meta server
143+
resourcePackageMap, err := o.metaServer.NodeResourcePackages(context.Background())
144+
if err != nil {
145+
return nil, fmt.Errorf("NodeResourcePackages failed with error: %v", err)
146+
}
147+
148+
allocatable := make(map[int]float64)
149+
for nodeID, packages := range resourcePackageMap {
150+
for _, pkg := range packages {
151+
if pkg.PackageName == resourcePackage {
152+
if pkg.Allocatable == nil {
153+
continue
154+
}
155+
156+
// Use the native package to get CPU quantity safely
157+
cpuQuantity := native.CPUQuantityGetter()(*pkg.Allocatable)
158+
if cpuQuantity.IsZero() {
159+
continue
160+
}
161+
162+
allocatable[nodeID] = float64(cpuQuantity.MilliValue()) / 1000
163+
}
164+
}
165+
}
166+
return allocatable, nil
167+
}
168+
169+
func (o *resourcePackageHintOptimizer) getResourcePackageAllocated(resourcePackage string) (map[int]float64, error) {
170+
machineState := o.state.GetMachineState()
171+
allocated := make(map[int]float64)
172+
for nodeID, nodeState := range machineState {
173+
if nodeState == nil {
174+
continue
175+
}
176+
177+
for _, entries := range nodeState.PodEntries {
178+
for _, entry := range entries {
179+
if entry == nil || resourcepackage.GetResourcePackageName(entry.Annotations) != resourcePackage {
180+
continue
181+
}
182+
183+
allocated[nodeID] += entry.RequestQuantity
184+
}
185+
}
186+
}
187+
return allocated, nil
188+
}

0 commit comments

Comments
 (0)