Skip to content

Commit 8c7d9a4

Browse files
committed
feat: support to isolate pods by cpu utilization
align cpuset with L3 cache Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
1 parent 798dc28 commit 8c7d9a4

12 files changed

Lines changed: 536 additions & 104 deletions

File tree

cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/cpu/cpu_isolation.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cpu
1919
import (
2020
"fmt"
2121
"strconv"
22+
"time"
2223

2324
"github.com/spf13/pflag"
2425
"k8s.io/apimachinery/pkg/util/sets"
@@ -59,6 +60,11 @@ type CPUIsolationOptions struct {
5960
IsolationNonExclusivePools []string
6061

6162
IsolationIncludeSidecarRequirement bool
63+
UtilWatermarkSupreme float64
64+
UtilWatermarkHigh float64
65+
UtilWatermarkLow float64
66+
MetricSyncPeriod time.Duration
67+
MetricSlidingWindowTime time.Duration
6268
}
6369

6470
// NewCPUIsolationOptions creates a new Options with a default config
@@ -80,6 +86,12 @@ func NewCPUIsolationOptions() *CPUIsolationOptions {
8086
IsolationDisabledPools: []string{},
8187
IsolationForceEnablePools: []string{},
8288
IsolationNonExclusivePools: []string{},
89+
90+
UtilWatermarkSupreme: 0.75,
91+
UtilWatermarkHigh: 0.55,
92+
UtilWatermarkLow: 0.40,
93+
MetricSyncPeriod: time.Second * 20,
94+
MetricSlidingWindowTime: 2 * time.Minute,
8395
}
8496
}
8597

@@ -119,6 +131,11 @@ func (o *CPUIsolationOptions) AddFlags(fs *pflag.FlagSet) {
119131
"isolation is non-exclusive for get given pool")
120132
fs.BoolVar(&o.IsolationIncludeSidecarRequirement, "isolation-include-sidecar-requirement", o.IsolationIncludeSidecarRequirement,
121133
"isolation include sidecar requirement")
134+
135+
fs.Float64Var(&o.UtilWatermarkLow, "isolation-util-watermark-low", o.UtilWatermarkLow, "cpu utilization watermark low for isolation")
136+
fs.Float64Var(&o.UtilWatermarkHigh, "isolation-util-watermark-high", o.UtilWatermarkHigh, "cpu utilization watermark high for isolation")
137+
fs.DurationVar(&o.MetricSlidingWindowTime, "isolation-metric-sliding-window-time", o.MetricSlidingWindowTime, "metric sliding window time for isolation")
138+
fs.DurationVar(&o.MetricSyncPeriod, "isolation-metric-sync-period", o.MetricSyncPeriod, "metric sync period for isolation")
122139
}
123140

124141
// ApplyTo fills up config with options
@@ -163,6 +180,11 @@ func (o *CPUIsolationOptions) ApplyTo(c *cpu.CPUIsolationConfiguration) error {
163180
c.IsolationNonExclusivePools = sets.NewString(o.IsolationNonExclusivePools...)
164181

165182
c.IsolationIncludeSidecarRequirement = o.IsolationIncludeSidecarRequirement
183+
c.UtilWatermarkSupreme = o.UtilWatermarkSupreme
184+
c.UtilWatermarkHigh = o.UtilWatermarkHigh
185+
c.UtilWatermarkLow = o.UtilWatermarkLow
186+
c.MetricSlidingWindowTime = o.MetricSlidingWindowTime
187+
c.MetricSyncPeriod = o.MetricSyncPeriod
166188

167189
return nil
168190
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor/helper.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,21 @@ type BlockEntry struct {
9595
SubEntryName string
9696
}
9797

98+
func GetBlocksByPoolName(blockInfos []*BlockInfo, poolNamePrefix string) []*BlockInfo {
99+
result := make([]*BlockInfo, 0)
100+
for _, blockInfo := range blockInfos {
101+
if blockInfo == nil {
102+
continue
103+
}
104+
for ownerPoolName := range blockInfo.OwnerPoolEntryMap {
105+
if strings.HasPrefix(ownerPoolName, poolNamePrefix) {
106+
result = append(result, blockInfo)
107+
}
108+
}
109+
}
110+
return result
111+
}
112+
98113
// GetBlocks parses ListAndWatchResponse and returns map[int][]*BlockInfo,
99114
// the map is keyed as numa id -> blocks slice (which has been sorted and deduped)
100115
func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*BlockInfo, error) {

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -881,12 +881,8 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
881881

882882
// First handle blocks for NUMA-bound dedicated_cores containers
883883
// Reuse already allocated CPU sets when possible to minimize CPU migration
884-
for _, block := range blocks {
885-
if block == nil {
886-
general.Warningf("got nil block")
887-
continue
888-
}
889-
884+
dedicatedBlocks := advisorapi.GetBlocksByPoolName(blocks, commonstate.PoolNameDedicated)
885+
for _, block := range dedicatedBlocks {
890886
entry, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
891887
if !ok {
892888
continue
@@ -917,6 +913,8 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
917913

918914
var cpuset machine.CPUSet
919915
alreadyAllocatedCPUs = alreadyAllocatedCPUs.Intersection(numaAvailableCPUs)
916+
917+
// shrink cpuSet
920918
if alreadyAllocatedCPUs.Size() >= blockResult {
921919
cpuset, err = calculator.TakeByTopology(machineInfo, alreadyAllocatedCPUs, blockResult, true)
922920
if err != nil {
@@ -940,54 +938,65 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
940938

941939
// Then handle blocks for NUMA-bound shared_cores containers and reclaimed_cores containers colocated with them
942940
// These containers can share NUMA nodes with dedicated_cores containers
943-
for _, block := range blocks {
944-
if block == nil {
945-
general.Warningf("got nil block")
946-
continue
947-
}
948-
949-
_, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
950-
if ok {
951-
continue
952-
}
941+
f := func(blocks []*advisorapi.BlockInfo, alignedByL3 bool) error {
942+
for _, block := range blocks {
943+
if block == nil {
944+
general.Warningf("got nil block")
945+
continue
946+
}
953947

954-
blockID := block.BlockId
955-
if _, found := blockCPUSet[blockID]; found {
956-
general.Warningf("block: %v already allocated", blockID)
957-
continue
958-
}
948+
blockID := block.BlockId
949+
if _, found := blockCPUSet[blockID]; found {
950+
general.Warningf("block: %v already allocated", blockID)
951+
continue
952+
}
959953

960-
blockResult, err := general.CovertUInt64ToInt(block.Result)
961-
if err != nil {
962-
return nil, fmt.Errorf("parse block: %s result failed with error: %v",
963-
blockID, err)
964-
}
954+
blockResult, err := general.CovertUInt64ToInt(block.Result)
955+
if err != nil {
956+
return fmt.Errorf("parse block: %s result failed with error: %v",
957+
blockID, err)
958+
}
965959

966-
cpuset, err := calculator.TakeByTopology(machineInfo, numaAvailableCPUs, blockResult, false)
967-
if err != nil {
968-
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
969-
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
970-
}
960+
cpuset, err := calculator.TakeByTopology(machineInfo, numaAvailableCPUs, blockResult, alignedByL3)
961+
if err != nil {
962+
return fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
963+
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
964+
}
971965

972-
blockCPUSet[blockID] = cpuset
973-
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
974-
availableCPUs = availableCPUs.Difference(cpuset)
966+
blockCPUSet[blockID] = cpuset
967+
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
968+
availableCPUs = availableCPUs.Difference(cpuset)
975969

976-
for poolName := range block.OwnerPoolEntryMap {
977-
if commonstate.IsIsolationPool(poolName) || commonstate.IsShareNUMABindingPool(poolName) {
978-
withNUMABindingShareOrDedicatedPod = true
979-
break
970+
for poolName := range block.OwnerPoolEntryMap {
971+
if commonstate.IsIsolationPool(poolName) || commonstate.IsShareNUMABindingPool(poolName) {
972+
withNUMABindingShareOrDedicatedPod = true
973+
break
974+
}
980975
}
981976
}
977+
978+
// Finally, if there are NUMA-bound containers on this NUMA node,
979+
// deduct all numaAvailableCPUs from availableCPUs to ensure that
980+
// NUMA-bound pods don't share the same NUMA node with non-NUMA-bound pods
981+
if withNUMABindingShareOrDedicatedPod {
982+
// Because numaAvailableCPUs is a subset of availableCPUs,
983+
// we need to deduct all numaAvailableCPUs from availableCPUs
984+
availableCPUs = availableCPUs.Difference(numaAvailableCPUs)
985+
}
986+
return nil
982987
}
983988

984-
// Finally, if there are NUMA-bound containers on this NUMA node,
985-
// deduct all numaAvailableCPUs from availableCPUs to ensure that
986-
// NUMA-bound pods don't share the same NUMA node with non-NUMA-bound pods
987-
if withNUMABindingShareOrDedicatedPod {
988-
// Because numaAvailableCPUs is a subset of availableCPUs,
989-
// we need to deduct all numaAvailableCPUs from availableCPUs
990-
availableCPUs = availableCPUs.Difference(numaAvailableCPUs)
989+
isolationBlocks := advisorapi.GetBlocksByPoolName(blocks, commonstate.PoolNamePrefixIsolation)
990+
if err := f(isolationBlocks, true); err != nil {
991+
return nil, err
992+
}
993+
shareBlocks := advisorapi.GetBlocksByPoolName(blocks, commonstate.PoolNameShare)
994+
if err := f(shareBlocks, false); err != nil {
995+
return nil, err
996+
}
997+
reclaimBlocks := advisorapi.GetBlocksByPoolName(blocks, commonstate.PoolNameReclaim)
998+
if err := f(reclaimBlocks, false); err != nil {
999+
return nil, err
9911000
}
9921001
}
9931002

pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func (p *nodeMetricsReporterPlugin) ListAndWatchReportContent(_ *v1alpha1.Empty,
250250

251251
func (p *nodeMetricsReporterPlugin) updateNodeMetrics() {
252252
general.InfoS("try to update node metrics")
253+
p.metricAggregators.gc()
253254

254255
var errList []error
255256
nodeMetricInfo, err := p.getNodeMetricInfo()

pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go

Lines changed: 76 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type cpuResourceAdvisor struct {
105105
provisionAssembler provisionassembler.ProvisionAssembler
106106
headroomAssembler headroomassembler.HeadroomAssembler
107107

108-
isolator isolation.Isolator
108+
isolators []isolation.Isolator
109109
isolationSafety bool
110110

111111
mutex sync.RWMutex
@@ -131,7 +131,10 @@ func NewCPUResourceAdvisor(conf *config.Configuration, extraConf interface{}, me
131131
numRegionsPerNuma: make(map[int]int),
132132
nonBindingNumas: machine.NewCPUSet(),
133133

134-
isolator: isolation.NewLoadIsolator(conf, extraConf, emitter, metaCache, metaServer),
134+
isolators: []isolation.Isolator{
135+
isolation.NewLoadIsolator(conf, extraConf, emitter, metaCache, metaServer),
136+
isolation.NewPerfIsolator(conf, extraConf, emitter, metaCache, metaServer),
137+
},
135138

136139
metaCache: metaCache,
137140
metaServer: metaServer,
@@ -151,6 +154,11 @@ func NewCPUResourceAdvisor(conf *config.Configuration, extraConf interface{}, me
151154
}
152155

153156
func (cra *cpuResourceAdvisor) Run(ctx context.Context) {
157+
for _, isolator := range cra.isolators {
158+
if err := isolator.Start(ctx); err != nil {
159+
klog.Fatalf("[qosaware-cpu] start isolator failed: %v", err)
160+
}
161+
}
154162
<-ctx.Done()
155163
}
156164

@@ -234,7 +242,7 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) (
234242
}
235243

236244
cra.updateNumasAvailableResource()
237-
isolationExists := cra.setIsolatedContainers(tryIsolation)
245+
cra.setIsolatedContainers(tryIsolation)
238246

239247
// assign containers to regions
240248
if err := cra.assignContainersToRegions(); err != nil {
@@ -244,7 +252,7 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) (
244252

245253
cra.gcRegionMap()
246254
cra.updateAdvisorEssentials()
247-
if tryIsolation && isolationExists && !cra.checkIsolationSafety() {
255+
if tryIsolation && cra.hasIsolatedPods() && !cra.checkIsolationSafety() {
248256
klog.Errorf("[qosaware-cpu] failed to check isolation")
249257
return nil, errIsolationSafetyCheckFailed
250258
}
@@ -284,57 +292,81 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) (
284292
return &calculationResult, nil
285293
}
286294

287-
// setIsolatedContainers get isolation status from isolator and update into containers
288-
func (cra *cpuResourceAdvisor) setIsolatedContainers(enableIsolated bool) bool {
295+
func (cra *cpuResourceAdvisor) getTargetIsolatedPods() (sets.String, error) {
289296
isolatedPods := sets.NewString()
290-
if enableIsolated {
291-
isolatedPods = sets.NewString(cra.isolator.GetIsolatedPods()...)
292-
}
293-
if len(isolatedPods) > 0 {
294-
klog.Infof("[qosaware-cpu] current isolated pod: %v", isolatedPods.List())
297+
general.InfoS("try to getTargetIsolatedPods", "isolators", general.ToString(cra.isolators))
298+
for _, isolator := range cra.isolators {
299+
pods, err := isolator.GetIsolatedPods()
300+
if err != nil {
301+
return nil, err
302+
}
303+
isolatedPods.Insert(pods...)
295304
}
305+
return isolatedPods, nil
306+
}
296307

297-
_ = cra.metaCache.RangeAndUpdateContainer(func(podUID string, _ string, ci *types.ContainerInfo) bool {
298-
ci.Isolated = false
299-
if isolatedPods.Has(podUID) {
300-
ci.Isolated = true
308+
func (cra *cpuResourceAdvisor) hasIsolatedPods() bool {
309+
ret := false
310+
cra.metaCache.RangeContainer(func(podUID string, containerName string, ci *types.ContainerInfo) bool {
311+
if ci.Isolated {
312+
ret = true
313+
return false
301314
}
302315
return true
303316
})
304-
return len(isolatedPods) > 0
317+
return ret
305318
}
306319

307-
// checkIsolationSafety returns true iff the isolated-limit-sum and share-pool-size exceed total capacity
308-
// todo: this logic contains a lot of assumptions and should be refined in the future
309-
func (cra *cpuResourceAdvisor) checkIsolationSafety() bool {
310-
shareAndIsolationPoolSize := 0
311-
nonBindingNumas := cra.metaServer.CPUDetails.NUMANodes()
312-
for _, r := range cra.regionMap {
313-
if r.Type() == configapi.QoSRegionTypeShare {
314-
controlKnob, err := r.GetProvision()
315-
if err != nil {
316-
klog.Errorf("[qosaware-cpu] get controlKnob for %v err: %v", r.Name(), err)
317-
return false
318-
}
319-
shareAndIsolationPoolSize += int(controlKnob[configapi.ControlKnobNonReclaimedCPURequirement].Value)
320-
} else if r.Type() == configapi.QoSRegionTypeIsolation {
321-
pods := r.GetPods()
322-
cra.metaCache.RangeContainer(func(podUID string, _ string, containerInfo *types.ContainerInfo) bool {
323-
if _, ok := pods[podUID]; ok {
324-
shareAndIsolationPoolSize += int(containerInfo.CPULimit)
325-
}
326-
return true
327-
})
328-
} else if r.Type() == configapi.QoSRegionTypeDedicatedNumaExclusive {
329-
nonBindingNumas = nonBindingNumas.Difference(r.GetBindingNumas())
320+
// setIsolatedContainers get isolation status from isolator and update into containers
321+
func (cra *cpuResourceAdvisor) setIsolatedContainers(enableIsolation bool) {
322+
isolatedPods := sets.NewString()
323+
if enableIsolation {
324+
pods, err := cra.getTargetIsolatedPods()
325+
if err != nil {
326+
klog.Errorf("[qosaware-cpu] get isolated pods failed: %v", err)
327+
return
330328
}
329+
isolatedPods = pods
331330
}
332331

333-
nonBindingSize := cra.metaServer.NUMAToCPUs.CPUSizeInNUMAs(cra.nonBindingNumas.ToSliceNoSortInt()...)
334-
klog.Infof("[qosaware-cpu] shareAndIsolationPoolSize %v, nonBindingSize %v", shareAndIsolationPoolSize, nonBindingSize)
335-
if shareAndIsolationPoolSize > nonBindingSize {
336-
return false
337-
}
332+
klog.InfoS("[qosaware-cpu]", "current isolated pod", isolatedPods.List(), "enableIsolation", enableIsolation)
333+
_ = cra.metaCache.RangeAndUpdateContainer(func(podUID string, containerName string, ci *types.ContainerInfo) bool {
334+
ci.Isolated = isolatedPods.Has(podUID)
335+
return true
336+
})
337+
}
338+
339+
// checkIsolationSafety returns true iff the isolated-limit-sum and share-pool-size exceed total capacity
340+
// todo: this logic contains a lot of assumptions and should be refined in the future
341+
func (cra *cpuResourceAdvisor) checkIsolationSafety() bool {
342+
//shareAndIsolationPoolSize := 0
343+
//nonBindingNumas := cra.metaServer.CPUDetails.NUMANodes()
344+
//for _, r := range cra.regionMap {
345+
// if r.Type() == configapi.QoSRegionTypeShare {
346+
// controlKnob, err := r.GetProvision()
347+
// if err != nil {
348+
// klog.Errorf("[qosaware-cpu] get controlKnob for %v err: %v", r.Name(), err)
349+
// return false
350+
// }
351+
// shareAndIsolationPoolSize += int(controlKnob[configapi.ControlKnobNonReclaimedCPURequirement].Value)
352+
// } else if r.Type() == configapi.QoSRegionTypeIsolation {
353+
// pods := r.GetPods()
354+
// cra.metaCache.RangeContainer(func(podUID string, _ string, containerInfo *types.ContainerInfo) bool {
355+
// if _, ok := pods[podUID]; ok {
356+
// shareAndIsolationPoolSize += int(containerInfo.CPULimit)
357+
// }
358+
// return true
359+
// })
360+
// } else if r.Type() == configapi.QoSRegionTypeDedicatedNumaExclusive {
361+
// nonBindingNumas = nonBindingNumas.Difference(r.GetBindingNumas())
362+
// }
363+
//}
364+
//
365+
//nonBindingSize := cra.metaServer.NUMAToCPUs.CPUSizeInNUMAs(cra.nonBindingNumas.ToSliceNoSortInt()...)
366+
//klog.Infof("[qosaware-cpu] shareAndIsolationPoolSize %v, nonBindingSize %v", shareAndIsolationPoolSize, nonBindingSize)
367+
//if shareAndIsolationPoolSize > nonBindingSize {
368+
// return false
369+
//}
338370
return true
339371
}
340372

0 commit comments

Comments
 (0)