Skip to content

Commit 7bc7e9e

Browse files
committed
refactor(cpu): enhance block handling with owner pool info and allocation reuse
- Introduce BlockInfo struct to track owner pool and entry details - Split block allocation logic for dedicated and shared/reclaimed pools - Reuse existing allocations for dedicated cores when possible - Improve NUMA-aware CPU allocation strategy
1 parent fa44aa4 commit 7bc7e9e

2 files changed

Lines changed: 143 additions & 31 deletions

File tree

pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor/helper.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,25 @@ func (lwr *ListAndWatchResponse) GetSharedBindingNUMAs() (sets.Int, error) {
8585
return sharedBindingNUMAs, nil
8686
}
8787

88-
// GetBlocks parses ListAndWatchResponse and returns map[int][]*Block,
88+
type BlockInfo struct {
89+
Block
90+
OwnerPoolEntryMap map[string]BlockEntry
91+
}
92+
93+
type BlockEntry struct {
94+
EntryName string
95+
SubEntryName string
96+
}
97+
98+
// GetBlocks parses ListAndWatchResponse and returns map[int][]*BlockInfo,
8999
// the map is keyed as numa id -> blocks slice (which has been sorted and deduped)
90-
func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
100+
func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*BlockInfo, error) {
91101
if lwr == nil {
92102
return nil, fmt.Errorf("got nil ListAndWatchResponse")
93103
}
94104

95-
numaToBlocks := make(map[int]map[string]*Block)
96-
numaToSortedBlocks := make(map[int][]*Block)
105+
numaToBlocks := make(map[int]map[string]*BlockInfo)
106+
numaToSortedBlocks := make(map[int][]*BlockInfo)
97107
blocksEntryNames := make(map[string][][]string)
98108
visBlocksToNUMA := make(map[string]int)
99109

@@ -119,7 +129,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
119129
}
120130

121131
if numaToBlocks[numaIdInt] == nil {
122-
numaToBlocks[numaIdInt] = make(map[string]*Block)
132+
numaToBlocks[numaIdInt] = make(map[string]*BlockInfo)
123133
}
124134

125135
for _, block := range calculationResult.Blocks {
@@ -140,7 +150,19 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
140150
}
141151
}
142152

143-
numaToBlocks[numaIdInt][blockId] = block
153+
bi, ok := numaToBlocks[numaIdInt][blockId]
154+
if !ok {
155+
bi = &BlockInfo{
156+
Block: *block,
157+
OwnerPoolEntryMap: make(map[string]BlockEntry),
158+
}
159+
numaToBlocks[numaIdInt][blockId] = bi
160+
}
161+
162+
bi.OwnerPoolEntryMap[calculationInfo.OwnerPoolName] = BlockEntry{
163+
EntryName: entryName,
164+
SubEntryName: subEntryName,
165+
}
144166
visBlocksToNUMA[blockId] = numaIdInt
145167
blocksEntryNames[blockId] = append(blocksEntryNames[blockId], []string{entryName, subEntryName})
146168
}
@@ -149,7 +171,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
149171
}
150172

151173
for numaId, blocksMap := range numaToBlocks {
152-
blocks := make([]*Block, 0, len(blocksMap))
174+
blocks := make([]*BlockInfo, 0, len(blocksMap))
153175

154176
for _, block := range blocksMap {
155177
blocks = append(blocks, block)
@@ -170,7 +192,7 @@ func (lwr *ListAndWatchResponse) GetBlocks() (map[int][]*Block, error) {
170192
}
171193

172194
// getBlocksLessFunc judges if a block is less than another by entryNames of them
173-
func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*Block) func(i, j int) bool {
195+
func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*BlockInfo) func(i, j int) bool {
174196
return func(i, j int) bool {
175197
blockId1 := blocks[i].BlockId
176198
blockId2 := blocks[j].BlockId
@@ -231,10 +253,11 @@ func getBlocksLessFunc(blocksEntryNames map[string][][]string, blocks []*Block)
231253
}
232254
}
233255

234-
func logNUMAToBlocksSeq(numaToSortedBlocks map[int][]*Block, blocksEntryNames map[string][][]string) {
256+
func logNUMAToBlocksSeq(numaToSortedBlocks map[int][]*BlockInfo, blocksEntryNames map[string][][]string) {
235257
for numaId, blocks := range numaToSortedBlocks {
236258
for i, block := range blocks {
237-
general.InfoS("logNUMAToBlocksSeq", "numaId", numaId, "seq", i, "blockId", block.BlockId, "entryNames", blocksEntryNames[block.BlockId])
259+
general.InfoS("logNUMAToBlocksSeq", "numaId", numaId, "seq", i, "blockId",
260+
block.BlockId, "OwnerPoolEntryMap", block.OwnerPoolEntryMap, "entryNames", blocksEntryNames[block.BlockId])
238261
}
239262
}
240263
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go

Lines changed: 110 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -810,13 +810,19 @@ func (p *DynamicPolicy) applyAllSubCgroupQuotaToUnLimit(containerRelativePath st
810810
})
811811
}
812812

813-
// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response
814-
// and the logic contains three main steps
815-
// 1. handle blocks for static pools
816-
// 2. handle blocks with specified NUMA ids (probably be blocks for
817-
// numa_binding dedicated_cores containers and reclaimed_cores containers colocated with them)
818-
// 3. handle blocks without specified NUMA id (probably be blocks for
819-
// not numa_binding dedicated_cores containers and pools of shared_cores and reclaimed_cores containers)
813+
// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response.
814+
// The logic contains the following main steps:
815+
// 1. Handle blocks for static pools and forbidden pools
816+
// 2. Handle blocks with specified NUMA IDs (for NUMA-bound dedicated_cores containers
817+
// and reclaimed_cores containers colocated with them)
818+
// 3. Handle blocks without specified NUMA ID (for non-NUMA-bound containers including
819+
// dedicated_cores, shared_cores and reclaimed_cores containers)
820+
//
821+
// For each block, the function allocates CPU sets based on:
822+
// - Already allocated CPUs for dedicated cores
823+
// - Available CPUs considering already allocated static/forbidden pools
824+
// - NUMA topology awareness for better performance
825+
// - CPU allocation strategies that minimize CPU migrations
820826
func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchResponse) (advisorapi.BlockCPUSet, error) {
821827
if resp == nil {
822828
return nil, fmt.Errorf("got nil resp")
@@ -831,8 +837,9 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
831837
topology := machineInfo.CPUTopology
832838
availableCPUs := topology.CPUDetails.CPUs()
833839

834-
// walk through static pools to construct blockCPUSet (for static pool),
835-
// and calculate availableCPUs after deducting static pools
840+
// Walk through static pools to construct blockCPUSet (for static pool),
841+
// and calculate availableCPUs after deducting static pools.
842+
// Static pools are predefined pools that should not be changed during runtime.
836843
blockCPUSet := advisorapi.NewBlockCPUSet()
837844
for _, poolName := range state.StaticPools.List() {
838845
allocationInfo := p.state.GetAllocationInfo(poolName, commonstate.FakedContainerName)
@@ -850,8 +857,8 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
850857
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])
851858
}
852859

853-
// walk through forbidden pools to construct blockCPUSet (for forbidden pool),
854-
// and calculate availableCPUs after deducting forbidden pools
860+
// Walk through forbidden pools and deduct their CPUs from availableCPUs.
861+
// Forbidden pools are reserved pools that should not be allocated to any containers.
855862
for _, poolName := range state.ForbiddenPools.List() {
856863
allocationInfo := p.state.GetAllocationInfo(poolName, commonstate.FakedContainerName)
857864
if allocationInfo == nil {
@@ -861,20 +868,30 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
861868
availableCPUs = availableCPUs.Difference(allocationInfo.AllocationResult.Clone())
862869
}
863870

864-
// walk through all blocks with specified NUMA ids
865-
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
871+
// Process blocks with specified NUMA IDs (for NUMA-bound containers)
872+
// These are typically dedicated_cores containers with NUMA binding and
873+
// reclaimed_cores containers colocated with them
866874
for numaID, blocks := range numaToBlocks {
867875
if numaID == commonstate.FakedNUMAID {
868876
continue
869877
}
870878

879+
withNUMABindingShareOrDedicatedPod := false
871880
numaAvailableCPUs := availableCPUs.Intersection(topology.CPUDetails.CPUsInNUMANodes(numaID))
881+
882+
// First handle blocks for NUMA-bound dedicated_cores containers
883+
// Reuse already allocated CPU sets when possible to minimize CPU migration
872884
for _, block := range blocks {
873885
if block == nil {
874886
general.Warningf("got nil block")
875887
continue
876888
}
877889

890+
entry, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
891+
if !ok {
892+
continue
893+
}
894+
878895
blockID := block.BlockId
879896

880897
if _, found := blockCPUSet[blockID]; found {
@@ -888,6 +905,64 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
888905
blockID, err)
889906
}
890907

908+
allocationInfo := p.state.GetAllocationInfo(entry.EntryName, entry.SubEntryName)
909+
if allocationInfo == nil {
910+
continue
911+
}
912+
913+
alreadyAllocatedCPUs, ok := allocationInfo.TopologyAwareAssignments[numaID]
914+
if !ok {
915+
continue
916+
}
917+
918+
var cpuset machine.CPUSet
919+
alreadyAllocatedCPUs = alreadyAllocatedCPUs.Intersection(numaAvailableCPUs)
920+
if alreadyAllocatedCPUs.Size() >= blockResult {
921+
cpuset, err = calculator.TakeByTopology(machineInfo, alreadyAllocatedCPUs, blockResult, true)
922+
if err != nil {
923+
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
924+
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
925+
}
926+
} else {
927+
cpuset, err = calculator.TakeByTopology(machineInfo, numaAvailableCPUs.Difference(alreadyAllocatedCPUs), blockResult-alreadyAllocatedCPUs.Size(), true)
928+
if err != nil {
929+
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
930+
blockID, numaID, err, numaAvailableCPUs.Size(), numaAvailableCPUs.String(), blockResult)
931+
}
932+
cpuset = cpuset.Union(alreadyAllocatedCPUs)
933+
}
934+
935+
blockCPUSet[blockID] = cpuset
936+
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
937+
availableCPUs = availableCPUs.Difference(cpuset)
938+
withNUMABindingShareOrDedicatedPod = true
939+
}
940+
941+
// Then handle blocks for NUMA-bound shared_cores containers and reclaimed_cores containers colocated with them
942+
// These containers can share NUMA nodes with dedicated_cores containers
943+
for _, block := range blocks {
944+
if block == nil {
945+
general.Warningf("got nil block")
946+
continue
947+
}
948+
949+
_, ok := block.OwnerPoolEntryMap[commonstate.PoolNameDedicated]
950+
if ok {
951+
continue
952+
}
953+
954+
blockID := block.BlockId
955+
if _, found := blockCPUSet[blockID]; found {
956+
general.Warningf("block: %v already allocated", blockID)
957+
continue
958+
}
959+
960+
blockResult, err := general.CovertUInt64ToInt(block.Result)
961+
if err != nil {
962+
return nil, fmt.Errorf("parse block: %s result failed with error: %v",
963+
blockID, err)
964+
}
965+
891966
cpuset, err := calculator.TakeByTopology(machineInfo, numaAvailableCPUs, blockResult, false)
892967
if err != nil {
893968
return nil, fmt.Errorf("allocate cpuset for NUMA Aware block: %s in NUMA: %d failed with error: %v, numaAvailableCPUs: %d(%s), blockResult: %d",
@@ -897,11 +972,26 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
897972
blockCPUSet[blockID] = cpuset
898973
numaAvailableCPUs = numaAvailableCPUs.Difference(cpuset)
899974
availableCPUs = availableCPUs.Difference(cpuset)
975+
976+
_, ok = block.OwnerPoolEntryMap[commonstate.PoolNameShare]
977+
if ok {
978+
withNUMABindingShareOrDedicatedPod = true
979+
}
980+
}
981+
982+
// Finally, if there are NUMA-bound containers on this NUMA node,
983+
// deduct all numaAvailableCPUs from availableCPUs to ensure that
984+
// NUMA-bound pods don't share the same NUMA node with non-NUMA-bound pods
985+
if withNUMABindingShareOrDedicatedPod {
986+
// Because numaAvailableCPUs is a subset of availableCPUs,
987+
// we need to deduct all numaAvailableCPUs from availableCPUs
988+
availableCPUs = availableCPUs.Difference(numaAvailableCPUs)
900989
}
901990
}
902991

903-
// walk through all blocks without specified NUMA id
904-
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
992+
// Walk through all blocks without specified NUMA ID (non-NUMA-bound containers)
993+
// For each block, allocate CPUs using NUMA balance strategy to minimize
994+
// memory access latency and CPU migrations
905995
for _, block := range numaToBlocks[commonstate.FakedNUMAID] {
906996
if block == nil {
907997
general.Warningf("got nil block")
@@ -921,17 +1011,16 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
9211011
blockID, err)
9221012
}
9231013

924-
// use NUMA balance strategy to aviod changing memset as much as possible
925-
// for blocks with faked NUMA id
926-
var cpuset machine.CPUSet
927-
cpuset, availableCPUs, err = calculator.TakeByNUMABalance(machineInfo, availableCPUs, blockResult)
1014+
// Use NUMA balance strategy to avoid changing memory affinity (memset) as much as possible
1015+
// for blocks with faked NUMA ID (non-NUMA-bound containers)
1016+
resultCPUSet, _, err := calculator.TakeByNUMABalance(machineInfo, availableCPUs, blockResult)
9281017
if err != nil {
9291018
return nil, fmt.Errorf("allocate cpuset for non NUMA Aware block: %s failed with error: %v, availableCPUs: %d(%s), blockResult: %d",
9301019
blockID, err, availableCPUs.Size(), availableCPUs.String(), blockResult)
9311020
}
9321021

933-
blockCPUSet[blockID] = cpuset
934-
availableCPUs = availableCPUs.Difference(cpuset)
1022+
blockCPUSet[blockID] = resultCPUSet
1023+
availableCPUs = availableCPUs.Difference(resultCPUSet)
9351024
}
9361025

9371026
return blockCPUSet, nil

0 commit comments

Comments
 (0)