Skip to content

Commit 391e54b

Browse files
committed
refactor(cpu): enhance block handling with owner pool info and allocation reuse
- Introduce BlockInfo struct to track owner pool and entry details - Split block allocation logic for dedicated and shared/reclaimed pools - Reuse existing allocations for dedicated cores when possible - Improve NUMA-aware CPU allocation strategy
1 parent d9b17bc commit 391e54b

2 files changed

Lines changed: 143 additions & 31 deletions

File tree

pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor/helper.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,25 @@ func (lwr *ListAndWatchResponse) GetSharedBindingNUMAs() (sets.Int, error) {
8585
return sharedBindingNUMAs, nil
8686
}
8787

88-
// GetBlocks parses ListAndWatchResponse and returns map[int][]*Block,
88+
type BlockInfo struct {
89+
Block
90+
OwnerPoolEntryMap map[string]BlockEntry
91+
}
92+
93+
type BlockEntry struct {
94+
EntryName string
95+
SubEntryName string
96+
}
97+
98+
// GetBlocks parses ListAndWatchResponse and returns map[int][]*BlockInfo,
8999
// the map is keyed as numa id -> blocks slice (which has been sorted and deduped)
90-
func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
100+
func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*BlockInfo, error) {
91101
if lwr == nil {
92102
return nil, fmt.Errorf("got nil ListAndWatchResponse")
93103
}
94104

95-
numaToBlocks := make(map[int]map[string]*Block)
96-
numaToSortedBlocks := make(map[int][]*Block)
105+
numaToBlocks := make(map[int]map[string]*BlockInfo)
106+
numaToSortedBlocks := make(map[int][]*BlockInfo)
97107
blocksEntryNames := make(map[string][][]string)
98108
visBlocksToNUMA := make(map[string]int)
99109

@@ -119,7 +129,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
119129
}
120130

121131
if numaToBlocks[numaIdInt] == nil {
122-
numaToBlocks[numaIdInt] = make(map[string]*Block)
132+
numaToBlocks[numaIdInt] = make(map[string]*BlockInfo)
123133
}
124134

125135
for _, block := range calculationResult.Blocks {
@@ -140,7 +150,19 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
140150
}
141151
}
142152

143-
numaToBlocks[numaIdInt][blockId] = block
153+
bi, ok := numaToBlocks[numaIdInt][blockId]
154+
if !ok {
155+
bi = &BlockInfo{
156+
Block: *block,
157+
OwnerPoolEntryMap: make(map[string]BlockEntry),
158+
}
159+
numaToBlocks[numaIdInt][blockId] = bi
160+
}
161+
162+
bi.OwnerPoolEntryMap[calculationInfo.OwnerPoolName] = BlockEntry{
163+
EntryName: entryName,
164+
SubEntryName: subEntryName,
165+
}
144166
visBlocksToNUMA[blockId] = numaIdInt
145167
blocksEntryNames[blockId] = append(blocksEntryNames[blockId], []string{entryName, subEntryName})
146168
}
@@ -149,7 +171,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
149171
}
150172

151173
for numaId, blocksMap := range numaToBlocks {
152-
blocks := make([]*Block, 0, len(blocksMap))
174+
blocks := make([]*BlockInfo, 0, len(blocksMap))
153175

154176
for _, block := range blocksMap {
155177
blocks = append(blocks, block)
@@ -170,7 +192,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
170192
}
171193

172194
// getBlocksLessFunc judges if a block is less than another by entryNames of them
173-
func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*Block) func(i, j int) bool {
195+
func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*BlockInfo) func(i, j int) bool {
174196
return func(i, j int) bool {
175197
blockId1 := blocks[i].BlockId
176198
blockId2 := blocks[j].BlockId
@@ -231,10 +253,11 @@ func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*Block)
231253
}
232254
}
233255

234-
func logNUMAToBlocksSeq(numaToSortedBlocks map[int][]*Block, blocksEntryNames map[string][][]string) {
256+
func logNUMAToBlocksSeq(numaToSortedBlocks map[int][]*BlockInfo, blocksEntryNames map[string][][]string) {
235257
for numaId, blocks := range numaToSortedBlocks {
236258
for i, block := range blocks {
237-
general.InfoS("logNUMAToBlocksSeq", "numaId", numaId, "seq", i, "blockId", block.BlockId, "entryNames", blocksEntryNames[block.BlockId])
259+
general.InfoS("logNUMAToBlocksSeq", "numaId", numaId, "seq", i, "blockId",
260+
block.BlockId, "OwnerPoolEntryMap", block.OwnerPoolEntryMap, "entryNames", blocksEntryNames[block.BlockId])
238261
}
239262
}
240263
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go

Lines changed: 110 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,19 @@ func (p *DynamicPolicy) applyCgroupConfigs(resp *advisorapi.ListAndWatchResponse
548548
return nil
549549
}
550550

551-
// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response
552-
// and the logic contains three main steps
553-
// 1. handle blocks for static pools
554-
// 2. handle blocks with specified NUMA ids (probably be blocks for
555-
// numa_binding dedicated_cores containers and reclaimed_cores containers colocated with them)
556-
// 3. handle blocks without specified NUMA id (probably be blocks for
557-
// not numa_binding dedicated_cores containers and pools of shared_cores and reclaimed_cores containers)
551+
// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response.
552+
// The logic contains the following main steps:
553+
// 1. Handle blocks for static pools and forbidden pools
554+
// 2. Handle blocks with specified NUMA IDs (for NUMA-bound dedicated_cores containers
555+
// and reclaimed_cores containers colocated with them)
556+
// 3. Handle blocks without specified NUMA ID (for non-NUMA-bound containers including
557+
// dedicated_cores, shared_cores and reclaimed_cores containers)
558+
//
559+
// For each block, the function allocates CPU sets based on:
560+
// - Already allocated CPUs for dedicated cores
561+
// - Available CPUs considering already allocated static/forbidden pools
562+
// - NUMA topology awareness for better performance
563+
// - CPU allocation strategies that minimize CPU migrations
558564
func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchResponse) (advisorapi.BlockCPUSet, error) {
559565
if resp == nil {
560566
return nil, fmt.Errorf("got nil resp")
@@ -569,8 +575,9 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
569575
topology := machineInfo.CPUTopology
570576
availableCPUs := topology.CPUDetails.CPUs()
571577

572-
// walk through static pools to construct blockCPUSet (for static pool),
573-
// and calculate availableCPUs after deducting static pools
578+
// Walk through static pools to construct blockCPUSet (for static pool),
579+
// and calculate availableCPUs after deducting static pools.
580+
// Static pools are predefined pools that should not be changed during runtime.
574581
blockCPUSet := advisorapi.NewBlockCPUSet()
575582
for _, poolName := range state.StaticPools.List() {
576583
allocationInfo := p.state.GetAllocationInfo(poolName, commonstate.FakedContainerName)
@@ -588,8 +595,8 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
588595
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])
589596
}
590597

591-
// walk through forbidden pools to construct blockCPUSet (for forbidden pool),
592-
// and calculate availableCPUs after deducting forbidden pools
598+
// Walk through forbidden pools and deduct their CPUs from availableCPUs.
599+
// Forbidden pools are reserved pools that should not be allocated to any containers.
593600
for _, poolName := range state.ForbiddenPools.List() {
594601
allocationInfo := p.state.GetAllocationInfo(poolName, commonstate.FakedContainerName)
595602
if allocationInfo == nil {
@@ -599,20 +606,30 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
599606
availableCPUs = availableCPUs.Difference(allocationInfo.AllocationResult.Clone())
600607
}
601608

602-
// walk through all blocks with specified NUMA ids
603-
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
609+
// Process blocks with specified NUMA IDs (for NUMA-bound containers)
610+
// These are typically dedicated_cores containers with NUMA binding and
611+
// reclaimed_cores containers colocated with them
604612
for numaID, blocks := range numaToBlocks {
605613
if numaID == commonstate.FakedNUMAID {
606614
continue
607615
}
608616

617+
withNUMABindingShareOrDedicatedPod := false
609618
numaAvailableCPUs := availableCPUs.Intersection(topology.CPUDetails.CPUsInNUMANodes(numaID))
619+
620+
// First handle blocks for NUMA-bound dedicated_cores containers
621+
// Reuse already allocated CPU sets when possible to minimize CPU migration
610622
for _, block := range blocks {
611623
if block == nil {
612624
general.Warningf("got nil block")
613625
continue
614626
}
615627

628+
entry, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
629+
if !ok {
630+
continue
631+
}
632+
616633
blockID := block.BlockId
617634

618635
if _, found := blockCPUSet[blockID]; found {
@@ -626,6 +643,64 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
626643
blockID, err)
627644
}
628645

646+
allocationInfo := p.state.GetAllocationInfo(entry.EntryName, entry.SubEntryName)
647+
if allocationInfo == nil {
648+
continue
649+
}
650+
651+
alreadyAllocatedCPUs, ok := allocationInfo.TopologyAwareAssignments[numaID]
652+
if !ok {
653+
continue
654+
}
655+
656+
var cpuset machine.CPUSet
657+
alreadyAllocatedCPUs = alreadyAllocatedCPUs.Intersection(numaAvailableCPUs)
658+
if alreadyAllocatedCPUs.Size() >= blockResult {
659+
cpuset, err = calculator.TakeByTopology(machineInfo, alreadyAllocatedCPUs, blockResult, true)
660+
if err != nil {
661+
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
662+
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
663+
}
664+
} else {
665+
cpuset, err = calculator.TakeByTopology(machineInfo, numaAvailableCPUs.Difference(alreadyAllocatedCPUs), blockResult-alreadyAllocatedCPUs.Size(), true)
666+
if err != nil {
667+
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
668+
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
669+
}
670+
cpuset = cpuset.Union(alreadyAllocatedCPUs)
671+
}
672+
673+
blockCPUSet[blockID] = cpuset
674+
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
675+
availableCPUs = availableCPUs.Difference(cpuset)
676+
withNUMABindingShareOrDedicatedPod = true
677+
}
678+
679+
// Then handle blocks for NUMA-bound shared_cores containers and reclaimed_cores containers colocated with them
680+
// These containers can share NUMA nodes with dedicated_cores containers
681+
for _, block := range blocks {
682+
if block == nil {
683+
general.Warningf("got nil block")
684+
continue
685+
}
686+
687+
_, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
688+
if ok {
689+
continue
690+
}
691+
692+
blockID := block.BlockId
693+
if _, found := blockCPUSet[blockID]; found {
694+
general.Warningf("block: %v already allocated", blockID)
695+
continue
696+
}
697+
698+
blockResult, err := general.CovertUInt64ToInt(block.Result)
699+
if err != nil {
700+
return nil, fmt.Errorf("parse block: %s result failed with error: %v",
701+
blockID, err)
702+
}
703+
629704
cpuset, err := calculator.TakeByTopology(machineInfo, numaAvailableCPUs, blockResult, false)
630705
if err != nil {
631706
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
@@ -635,11 +710,26 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
635710
blockCPUSet[blockID] = cpuset
636711
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
637712
availableCPUs = availableCPUs.Difference(cpuset)
713+
714+
_, ok = block.OwnerPoolEntryMap[commonstate.PoolNameShare]
715+
if ok {
716+
withNUMABindingShareOrDedicatedPod = true
717+
}
718+
}
719+
720+
// Finally, if there are NUMA-bound containers on this NUMA node,
721+
// deduct all numaAvailableCPUs from availableCPUs to ensure that
722+
// NUMA-bound pods don't share the same NUMA node with non-NUMA-bound pods
723+
if withNUMABindingShareOrDedicatedPod {
724+
// Because numaAvailableCPUs is a subset of availableCPUs,
725+
// we need to deduct all numaAvailableCPUs from availableCPUs
726+
availableCPUs = availableCPUs.Difference(numaAvailableCPUs)
638727
}
639728
}
640729

641-
// walk through all blocks without specified NUMA id
642-
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
730+
// Walk through all blocks without specified NUMA ID (non-NUMA-bound containers)
731+
// For each block, allocate CPUs using NUMA balance strategy to minimize
732+
// memory access latency and CPU migrations
643733
for _, block := range numaToBlocks[commonstate.FakedNUMAID] {
644734
if block == nil {
645735
general.Warningf("got nil block")
@@ -659,17 +749,16 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
659749
blockID, err)
660750
}
661751

662-
// use NUMA balance strategy to aviod changing memset as much as possible
663-
// for blocks with faked NUMA id
664-
var cpuset machine.CPUSet
665-
cpuset, availableCPUs, err = calculator.TakeByNUMABalance(machineInfo, availableCPUs, blockResult)
752+
// Use NUMA balance strategy to avoid changing memory affinity (memset) as much as possible
753+
// for blocks with faked NUMA ID (non-NUMA-bound containers)
754+
resultCPUSet, _, err := calculator.TakeByNUMABalance(machineInfo, availableCPUs, blockResult)
666755
if err != nil {
667756
return nil, fmt.Errorf("allocate cpuset for non NUMA Aware block: %s failed with error: %v, availableCPUs: %d(%s), blockResult: %d",
668757
blockID, err, availableCPUs.Size(), availableCPUs.String(), blockResult)
669758
}
670759

671-
blockCPUSet[blockID] = cpuset
672-
availableCPUs = availableCPUs.Difference(cpuset)
760+
blockCPUSet[blockID] = resultCPUSet
761+
availableCPUs = availableCPUs.Difference(resultCPUSet)
673762
}
674763

675764
return blockCPUSet, nil

0 commit comments

Comments
 (0)