Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 40 additions & 194 deletions pkg/estimator/client/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package client
import (
"context"
"fmt"
"maps"
"math"
"sort"

Expand All @@ -29,7 +28,11 @@ import (

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/estimator"
"github.com/karmada-io/karmada/pkg/estimator/pb"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/util"
schedulerframework "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework"
)

// GeneralEstimator is the default replica estimator.
Expand Down Expand Up @@ -127,9 +130,9 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
return int32(allowedPods) // #nosec G115: integer overflow conversion int64 -> int32
}

podBound := allowedPods / podsPerSet
podBound := int32(allowedPods / podsPerSet) // #nosec G115: integer overflow conversion int64 -> int32
if len(perSet) == 0 || allZero(perSet) {
return int32(podBound) // #nosec G115: integer overflow conversion int64 -> int32
return podBound
}

// Find limiting resource requirement, which will bound maxSet calculation
Expand All @@ -144,116 +147,58 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
return 0 // no capacity for this resource
}

resBound := resAvail / req
resBound := int32(resAvail / req) // #nosec G115: integer overflow conversion int64 -> int32
if resBound < maxSets {
maxSets = resBound
}
}

if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) && len(cluster.Status.ResourceSummary.AllocatableModelings) > 0 {
if num, err := getMaximumSetsBasedOnResourceModels(cluster, components, podBound); err != nil {
if num, err := getMaximumSetsBasedOnResourceModels(cluster, components, maxSets); err != nil {
klog.Warningf("Failed to get maximum sets based on resource models, skipping: %v", err)
} else if num < maxSets {
maxSets = num
}
}

return int32(maxSets) // #nosec G115: integer overflow conversion int64 -> int32
return maxSets
}

// getMaximumSetsBasedOnResourceModels computes the maximum number of full sets that can be
// placed on a cluster using the cluster's ResourceModels. It expands one set into
// replica kinds (demand + count) and performs a first-fit-decreasing placement onto model-grade nodes.
// `upperBound` caps the search. We can set this using the podBound (allowedPods / podsPerSet)
func getMaximumSetsBasedOnResourceModels(
cluster *clusterv1alpha1.Cluster,
components []workv1alpha2.Component,
upperBound int64,
) (int64, error) {
if upperBound <= 0 {
return 0, nil
}

// Compressed one-set: per-kind (identical replicas grouped)
oneSetKinds := expandKindsOneSet(components)
if len(oneSetKinds) == 0 {
// If there are no pods to schedule, just return upperBound
return upperBound, nil
}

// Use cluster "available" totals (allocatable - allocated - allocating) for normalized scoring
// This reflects what the cluster can actually accept now
totals := availableResourceMap(cluster.Status.ResourceSummary)

for i := range oneSetKinds {
oneSetKinds[i].score = demandScoreNormalized(oneSetKinds[i].dem, totals)
}
sort.Slice(oneSetKinds, func(i, j int) bool {
if oneSetKinds[i].score == oneSetKinds[j].score {
return demandSum(oneSetKinds[i].dem) > demandSum(oneSetKinds[j].dem)
}
return oneSetKinds[i].score > oneSetKinds[j].score
})

//Build model nodes from Spec.ResourceModels and Status.AllocatableModelings
nodes, err := buildModelNodes(cluster)
func getMaximumSetsBasedOnResourceModels(cluster *clusterv1alpha1.Cluster, components []workv1alpha2.Component, upperSets int32) (int32, error) {
nodes, err := getNodesAvailableResources(cluster)
if err != nil {
return -1, err
}
if len(nodes) == 0 {
return 0, nil
}

var sets int64
for sets < upperBound {
if !placeOneSet(oneSetKinds, nodes) {
break
pbComponents := make([]pb.Component, 0, len(components))
for _, comp := range components {
// Deep-copy so that pointer is not shared between goroutines
var cr *workv1alpha2.ComponentReplicaRequirements
if comp.ReplicaRequirements != nil {
cr = comp.ReplicaRequirements.DeepCopy()
}
sets++
}
return sets, nil
}

// placeOneSet attempts to place exactly ONE full set (all kinds with their per-set replica counts)
// onto the provided working node capacities (in-place)
// Returns true if successful
func placeOneSet(orderedKinds []replicaKind, work []modelNode) bool {
for _, k := range orderedKinds {
remaining := k.count
if remaining <= 0 {
continue
}
// first-fit across nodes
for n := range work {
if remaining <= 0 {
break
}
fit := maxFit(work[n].cap, k.dem)
if fit <= 0 {
continue
}
place := fit
if place > remaining {
place = remaining
}
consumeMul(work[n].cap, k.dem, place)
remaining -= place
}
if remaining > 0 {
return false
}
pbComponents = append(pbComponents, pb.Component{
Name: comp.Name,
Replicas: comp.Replicas,
ReplicaRequirements: toPBReplicaRequirements(cr),
})
}
return true
}

// modelNode holds remaining capacity for a given node across all resource types
type modelNode struct {
cap map[corev1.ResourceName]int64
matchNode := func(_ *pb.NodeClaim, _ *schedulerframework.NodeInfo) bool {
// Always match since resource models lack node affinity/toleration info, so we skip these checks.
return true
}
return estimator.NewSchedulingSimulator(nodes, matchNode).SimulateSchedulingFFD(pbComponents, upperSets), nil
}

// buildModelNodes constructs identical nodes for each model grade using its Min vector,
// repeated AllocatableModelings[grade].Count times. Grades are indexed directly.
func buildModelNodes(cluster *clusterv1alpha1.Cluster) ([]modelNode, error) {
func getNodesAvailableResources(cluster *clusterv1alpha1.Cluster) ([]*schedulerframework.NodeInfo, error) {
if cluster == nil {
return nil, fmt.Errorf("nil cluster")
}
Expand All @@ -267,12 +212,14 @@ func buildModelNodes(cluster *clusterv1alpha1.Cluster) ([]modelNode, error) {
}

// Build capacity template per grade
capsByGrade := make(map[uint]map[corev1.ResourceName]int64, len(spec))
capsByGrade := make(map[uint]corev1.ResourceList, len(spec))
for _, m := range spec {
tmpl := make(map[corev1.ResourceName]int64, len(m.Ranges))
tmpl := make(corev1.ResourceList, len(m.Ranges))
for _, r := range m.Ranges {
tmpl[r.Name] = quantityAsInt64(r.Min)
tmpl[r.Name] = r.Min
}
// Pods resource is not modeled in ResourceModels, so we set it to MaxInt64 to avoid limiting scheduling.
tmpl[corev1.ResourcePods] = *resource.NewQuantity(math.MaxInt64, resource.DecimalSI)
capsByGrade[m.Grade] = tmpl
}

Expand All @@ -293,122 +240,21 @@ func buildModelNodes(cluster *clusterv1alpha1.Cluster) ([]modelNode, error) {
sort.Ints(grades)

// Emit nodes for grades present in both spec & status.
var nodes []modelNode
var nodes []*schedulerframework.NodeInfo
for _, grade := range grades {
tmpl, cnt := capsByGrade[uint(grade)], countByGrade[uint(grade)] // #nosec G115: integer overflow conversion int -> uint
if tmpl == nil || cnt == 0 {
continue
}
for range cnt {
capCopy := maps.Clone(tmpl)
nodes = append(nodes, modelNode{cap: capCopy})
}
}
return nodes, nil
}

// replicaKind represents a single type of component, including replica demand and count
type replicaKind struct {
dem map[corev1.ResourceName]int64 // per-replica demand
count int64 // how many replicas
score float64 // ordering heuristic (higher first)
}

// expandKindsOneSet flattens components into a slice of unique replica kinds.
// Each entry holds the per-replica demand and how many replicas of that kind a set needs.
func expandKindsOneSet(components []workv1alpha2.Component) []replicaKind {
kinds := make([]replicaKind, 0, len(components))
for _, c := range components {
if c.ReplicaRequirements == nil || c.ReplicaRequirements.ResourceRequest == nil {
continue
}
// normalize per-replica demand
base := make(map[corev1.ResourceName]int64, len(c.ReplicaRequirements.ResourceRequest))
for name, qty := range c.ReplicaRequirements.ResourceRequest {
base[name] = quantityAsInt64(qty)
}
// skip zero-demand or non-positive replica count
if allZero(base) || c.Replicas <= 0 {
continue
}

k := replicaKind{
dem: base,
count: int64(c.Replicas),
// score is filled later once we know cluster-wide totals
}
kinds = append(kinds, k)
}
return kinds
}

// demandScoreNormalized returns the "max utilization ratio" of a demand vector against total capacities
// If a resource is missing/zero in total, treat it as maximally constrained
func demandScoreNormalized(
demand map[corev1.ResourceName]int64,
total map[corev1.ResourceName]int64,
) float64 {
var maxRatio float64
for res, req := range demand {
if req <= 0 {
continue
}
totalCap := float64(total[res])
if totalCap <= 0 {
return math.MaxFloat64
}
ratio := float64(req) / totalCap
if ratio > maxRatio {
maxRatio = ratio
}
}
return maxRatio
}

// demandSum is used as a tie-breaker when initial scores are equal
func demandSum(m map[corev1.ResourceName]int64) int64 {
var s int64
for _, v := range m {
if v > 0 {
s += v
}
}
return s
}

// maxFit returns how many copies of `dem` fit in `cap` simultaneously
func maxFit(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64) int64 {
var limit int64 = math.MaxInt64
for k, req := range dem {
if req <= 0 {
continue
}
avail := capacity[k]
if avail <= 0 {
return 0
}
bound := avail / req
if bound < limit {
limit = bound
}
}
if limit == math.MaxInt64 {
return 0
}
return limit
}

// consumeMul subtracts mult * dem from cap
func consumeMul(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64, mult int64) {
if mult <= 0 {
return
}
for k, req := range dem {
if req <= 0 {
continue
for i := 0; i < cnt; i++ {
node := &schedulerframework.NodeInfo{
Allocatable: util.NewResource(tmpl),
}
nodes = append(nodes, node)
}
capacity[k] -= req * mult
}
return nodes, nil
}

// podsInSet computes the total number of pods in the CRD
Expand Down
6 changes: 3 additions & 3 deletions pkg/estimator/client/general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,16 @@ func comp(name string, replicas int32, rl corev1.ResourceList) workv1alpha2.Comp
func TestGetMaximumSetsBasedOnResourceModels(t *testing.T) {
const (
GPU corev1.ResourceName = "nvidia.com/gpu"
BIGU int64 = 100 // define a large upper bound so we can test model decision algo
BIGU int32 = 100 // define a large upper bound so we can test model decision algo
)

tests := []struct {
name string
cluster clusterv1alpha1.Cluster
components []workv1alpha2.Component
upperBound int64
upperBound int32
expectError bool
expectedSets int64
expectedSets int32
}{
{
name: "No grades defined → error",
Expand Down
Loading