Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/agent/qrm-plugins/commonstate/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func IsSystemPool(poolName string) bool {
return strings.HasPrefix(poolName, PoolNamePrefixSystem)
}

func IsShareNUMABindingPool(poolName string) bool {
return strings.Contains(poolName, NUMAPoolInfix)
}

func GetPoolType(poolName string) string {
if IsIsolationPool(poolName) {
return PoolNamePrefixIsolation
Expand Down
165 changes: 153 additions & 12 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,95 @@ func (a *cpuAccumulator) sort(ids []int, getCPUs func(ids ...int) machine.CPUSet
})
}

// getBestMatchCPUsNeededL3Cache returns the L3 cache ID that best matches the number of CPUs needed.
// It directly selects the L3 cache with the closest match to the required number of CPUs,
// preferring caches with CPU count equal to or slightly greater than the requirement.
func (a *cpuAccumulator) getBestMatchCPUsNeededL3Cache() (int, bool) {
l3Caches := a.cpuDetails.L3Caches().ToSliceInt()
if len(l3Caches) == 0 {
return 0, false
}

var bestL3CacheID int
bestMatchFound := false
var bestMatchDiff int = -1 // -1 indicates no match found yet

for _, l3CacheID := range l3Caches {
cpusInL3Cache := a.cpuDetails.CPUsInL3Caches(l3CacheID)
cpuCount := cpusInL3Cache.Size()

// Exact match - return immediately
if cpuCount == a.numCPUsNeeded {
return l3CacheID, true
}

// For caches with more CPUs than needed, prefer the one with the smallest excess
if cpuCount > a.numCPUsNeeded {
diff := cpuCount - a.numCPUsNeeded
if !bestMatchFound || diff < bestMatchDiff {
bestL3CacheID = l3CacheID
bestMatchDiff = diff
bestMatchFound = true
}
}
}

// If we found a cache with more CPUs than needed, return it
if bestMatchFound {
return bestL3CacheID, true
}

// If no cache with more CPUs was found, find the one with the most CPUs
// (closest match when all caches have fewer CPUs than needed)
for _, l3CacheID := range l3Caches {
cpusInL3Cache := a.cpuDetails.CPUsInL3Caches(l3CacheID)
cpuCount := cpusInL3Cache.Size()

if !bestMatchFound || cpuCount > bestMatchDiff {
bestL3CacheID = l3CacheID
bestMatchDiff = cpuCount
bestMatchFound = true
}
}

return bestL3CacheID, bestMatchFound
}

// tryAlignL3Caches handles remaining CPU allocation with L3 cache topology awareness.
//
// This method implements fine-grained CPU allocation based on L3 cache topology.
// When the requested CPU count doesn't align with complete L3 cache sizes,
// it intelligently selects the most suitable L3 cache to minimize cache contention
// and maximize memory locality for the workload.
//
// Algorithm:
// 1. Directly selects the L3 cache that best matches the remaining CPU requirement
// 2. If remaining need >= cache size: allocate entire cache and recurse
// 3. If remaining need < cache size: restrict allocation to this cache only
func (a *cpuAccumulator) tryAlignL3Caches() {
l3Cache, found := a.getBestMatchCPUsNeededL3Cache()
if !found {
return
}

cpusInL3Cache := a.cpuDetails.CPUsInL3Caches(l3Cache)
if a.numCPUsNeeded >= cpusInL3Cache.Size() {
// Cache is smaller than remaining need - take entire cache for efficiency
klog.V(4).InfoS("tryAlignL3Caches: claiming entire L3 cache (partial)", "l3Cache", l3Cache, "cacheSize", cpusInL3Cache.Size(), "remainingNeed", a.numCPUsNeeded)
a.take(cpusInL3Cache)
if a.isSatisfied() {
return
}
// Continue with remaining allocation from other caches
a.tryAlignL3Caches()
} else {
// Cache is larger than remaining need - restrict to this cache for optimal locality
// This ensures all allocated CPUs share the same L3 cache, minimizing memory latency
klog.V(4).InfoS("tryAlignL3Caches: restricting allocation to L3 cache", "l3Cache", l3Cache, "cacheSize", cpusInL3Cache.Size(), "remainingNeed", a.numCPUsNeeded)
a.cpuDetails = a.cpuDetails.KeepOnly(cpusInL3Cache)
}
}

// Sort all sockets with free CPUs using the sort() algorithm defined above.
func (a *cpuAccumulator) sortAvailableSockets() []int {
sockets := a.cpuDetails.Sockets().ToSliceNoSortInt()
Expand Down Expand Up @@ -224,42 +313,94 @@ func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > a.cpuDetails.CPUs().Size()
}

// TakeByTopology tries to allocate those required cpus in the same socket or cores
// TakeByTopology implements a topology-aware CPU allocation strategy that prioritizes
// hardware locality and cache efficiency for optimal workload performance.
//
// This function implements a multi-tier allocation strategy designed to minimize
// cross-socket communication and maximize cache utilization. The allocation follows
// a hierarchical approach from largest to smallest topology units.
//
// Parameters:
// - info: Machine topology information including NUMA, socket, core, and cache hierarchy
// - availableCPUs: Set of CPUs available for allocation
// - cpuRequirement: Number of CPUs needed for the workload
// - alignByL3Caches: Whether to consider L3 cache topology in allocation decisions
//
// Returns:
// - CPUSet: The allocated set of CPUs with optimal topology placement
// - error: Error if allocation fails due to insufficient resources
//
// Allocation Strategy (Topology-Aware Best-Fit):
//
// Phase 1: Socket-Level Allocation (Highest Locality)
// - Attempts to allocate entire CPU sockets when the requirement matches or exceeds socket size
// - Provides maximum memory bandwidth and minimal cross-socket latency
//
// Phase 2: L3 Cache-Aware Allocation (Conditional)
// - Activated when alignByL3Caches is true
// - Prioritizes allocation within shared L3 cache domains to minimize cache contention
// - Uses tryAlignL3Caches() for intelligent cache-aligned distribution
//
// Phase 3: Core-Level Allocation (Medium Locality)
// - Allocates complete CPU cores to avoid hyperthreading contention
// - Preferred for workloads sensitive to thread interference
//
// Phase 4: Thread-Level Allocation (Fine-Grained)
// - Allocates individual hyperthreads from partially utilized cores
// - Prefers cores on sockets already allocated to maintain NUMA affinity
func TakeByTopology(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
cpuRequirement int,
cpuRequirement int, alignByL3Caches bool,
) (machine.CPUSet, error) {
// Initialize accumulator with topology-aware state
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)

// Fast-path: Handle edge cases immediately
if acc.isSatisfied() {
// Zero CPU requirement - return empty set immediately
return acc.result.Clone(), nil
}
if acc.isFailed() {
return machine.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
// Insufficient resources - fail fast with descriptive error
return machine.NewCPUSet(), fmt.Errorf("insufficient CPUs: requested %d, available %d",
cpuRequirement, availableCPUs.Size())
}

// Algorithm: topology-aware best-fit
// 1. Acquire whole sockets, if available and the container requires at
// least a socket's-worth of CPUs.
// Phase 1: Socket-level allocation for maximum locality
// This phase attempts to allocate entire CPU sockets when beneficial
acc.takeFullSockets()
if acc.isSatisfied() {
klog.V(4).InfoS("TakeByTopology: allocated at socket level", "allocated", acc.result.Size())
return acc.result.Clone(), nil
}

// 2. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
// Phase 2: L3 cache topology optimization (if enabled)
// This phase considers cache topology to minimize memory latency
if alignByL3Caches {
acc.tryAlignL3Caches()
if acc.isSatisfied() {
klog.V(4).InfoS("TakeByTopology: allocated with L3 cache alignment", "allocated", acc.result.Size())
return acc.result.Clone(), nil
}
}

// Phase 3: Core-level allocation to avoid HT contention
// Allocates complete cores for workloads sensitive to thread interference
acc.takeFullCores()
if acc.isSatisfied() {
klog.V(4).InfoS("TakeByTopology: allocated at core level", "allocated", acc.result.Size())
return acc.result.Clone(), nil
}

// 3. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
// Phase 4: Thread-level allocation for remaining needs
// Allocates individual threads from partially utilized cores
acc.takeRemainingCPUs()
if acc.isSatisfied() {
klog.V(4).InfoS("TakeByTopology: allocated at thread level", "allocated", acc.result.Size())
return acc.result.Clone(), nil
}

return machine.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
// Exhaustive allocation failed - no combination satisfies requirement
return machine.NewCPUSet(), fmt.Errorf("topology-aware allocation failed: requested %d CPUs, exhausted all allocation strategies", cpuRequirement)
}

// TakeByNUMABalance tries to make the allocated cpu spread on different
Expand Down
Loading