Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
// computeConsolidation computes a consolidation action to take
//
// nolint:gocyclo
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, pscheduling.Results, error) {
func (c *consolidation) computeConsolidation(ctx context.Context, cache *pscheduling.SimulationCache, candidates ...*Candidate) (Command, pscheduling.Results, error) {
var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, cache, candidates...)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
}, scheduling.Results{}, nil
}

cache := scheduling.NewSimulationCache()
for _, candidate := range candidates {
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
Expand All @@ -92,7 +93,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
continue
}
// Check if we need to create any NodeClaims.
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate)
results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, cache, candidate)
if err != nil {
// if a candidate is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var errCandidateDeleting = fmt.Errorf("candidate is deleting")

//nolint:gocyclo
func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidates ...*Candidate,
cache *scheduling.SimulationCache, candidates ...*Candidate,
) (scheduling.Results, error) {
candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
nodes := cluster.Nodes()
Expand Down Expand Up @@ -79,7 +79,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes, cache)
if err != nil {
return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
// Set a timeout
timeoutCtx, cancel := context.WithTimeout(ctx, MultiNodeConsolidationTimeoutDuration)
defer cancel()
cache := scheduling.NewSimulationCache()
for min <= max {
mid := (min + max) / 2
candidatesToConsolidate := candidates[0 : mid+1]

// Pass the timeout context to ensure sub-operations can be canceled
cmd, results, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
cmd, results, err := m.computeConsolidation(timeoutCtx, cache, candidatesToConsolidate...)
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption

unseenNodePools := sets.New(lo.Map(candidates, func(c *Candidate, _ int) string { return c.NodePool.Name })...)

cache := scheduling.NewSimulationCache()
for i, candidate := range candidates {
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
unseenNodePools.Delete(candidate.NodePool.Name)

// compute a possible consolidation option
cmd, results, err := s.computeConsolidation(ctx, candidate)
cmd, results, err := s.computeConsolidation(ctx, cache, candidate)
if err != nil {
log.FromContext(ctx).Error(err, "failed computing consolidation")
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ var _ = Describe("Simulate Scheduling", func() {
candidate, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, stateNode, pdbs, nodePoolMap, nodePoolToInstanceTypesMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(Succeed())

results, err := disruption.SimulateScheduling(ctx, env.Client, cluster, prov, candidate)
results, err := disruption.SimulateScheduling(ctx, env.Client, cluster, prov, nil, candidate)
Expect(err).To(Succeed())
Expect(results.PodErrors[pod]).To(BeNil())
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
if len(candidates) == 0 {
return NewValidationError(fmt.Errorf("no candidates"))
}
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, nil, candidates...)
if err != nil {
return fmt.Errorf("simluating scheduling, %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (p *Provisioner) NewScheduler(
ctx context.Context,
pods []*corev1.Pod,
stateNodes []*state.StateNode,
cache *scheduler.SimulationCache,
opts ...scheduler.Options,
) (*scheduler.Scheduler, error) {
nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider)
Expand Down Expand Up @@ -263,15 +264,15 @@ func (p *Provisioner) NewScheduler(
}

// Calculate cluster topology, if a context error occurs, it is wrapped and returned
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods, cache)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
daemonSetPods, err := p.getDaemonSetPods(ctx)
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, opts...), nil
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, cache, opts...), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down Expand Up @@ -309,7 +310,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, nil
}
log.FromContext(ctx).V(1).WithValues("pending-pods", len(pendingPods), "deleting-pods", len(deletingNodePods)).Info("computing scheduling decision for provisionable pod(s)")
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedCapacityFallback)
s, err := p.NewScheduler(ctx, pods, nodes.Active(), nil, scheduler.DisableReservedCapacityFallback)
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewScheduler(
daemonSetPods []*corev1.Pod,
recorder events.Recorder,
clock clock.Clock,
cache *SimulationCache,
opts ...Options,
) *Scheduler {

Expand Down Expand Up @@ -126,6 +127,7 @@ func NewScheduler(
return np.Name, corev1.ResourceList(np.Spec.Limits)
}),
clock: clock,
cache: cache,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we already have a pod cache -- does it make sense to share this cache across both node requirements and pod requirements so that we keep a single cache and just have different data in it?

reservationManager: NewReservationManager(instanceTypes),
reservedOfferingMode: option.Resolve(opts...).reservedOfferingMode,
}
Expand Down Expand Up @@ -153,6 +155,7 @@ type Scheduler struct {
recorder events.Recorder
kubeClient client.Client
clock clock.Clock
cache *SimulationCache
reservationManager *ReservationManager
reservedOfferingMode ReservedOfferingMode
}
Expand Down Expand Up @@ -434,7 +437,7 @@ func (s *Scheduler) calculateExistingNodeClaims(stateNodes []*state.StateNode, d
if err := scheduling.Taints(taints).ToleratesPod(p); err != nil {
continue
}
if err := scheduling.NewLabelRequirements(node.Labels()).Compatible(scheduling.NewPodRequirements(p)); err != nil {
if err := s.cache.StateNodeLabelRequirements(node).Compatible(scheduling.NewPodRequirements(p)); err != nil {
continue
}
daemons = append(daemons, p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
cluster = state.NewCluster(clock, client, cloudProvider)
topology, err := scheduling.NewTopology(ctx, client, cluster, nil, []*v1.NodePool{nodePool}, map[string][]*cloudprovider.InstanceType{
nodePool.Name: instanceTypes,
}, pods)
}, pods, nil)
if err != nil {
b.Fatalf("creating topology, %s", err)
}
Expand All @@ -173,6 +173,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
nil,
events.NewRecorder(&record.FakeRecorder{}),
clock,
nil,
scheduling.DisableReservedCapacityFallback,
)

Expand Down
47 changes: 47 additions & 0 deletions pkg/controllers/provisioning/scheduling/simulationcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import (
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/scheduling"
)

// SimulationCache is used specifically for consolidation as a method to cache
// expensive calculations across simulation runs. The methods are written so that they
// perform correctly (though without caching), if the cache object is nil.
type SimulationCache struct {
stateNodeLabelRequirements *scheduling.RequirementsReadOnly
}

func NewSimulationCache() *SimulationCache {
return &SimulationCache{}
}

// StateNodeLabelRequirements returns the scheduling requirements for the state nodes labels. This is safe to cache
// as we don't modify these requirements and the state nodes won't change during a consolidation pass.
func (c *SimulationCache) StateNodeLabelRequirements(n *state.StateNode) scheduling.RequirementsReadOnly {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused: We pass a scheduler cache into our scheduler, but this scheduler cache only stores data from a single node?

if c == nil {
return scheduling.NewLabelRequirements(n.Node.Labels)
}
if c.stateNodeLabelRequirements != nil {
return *c.stateNodeLabelRequirements
}
reqs := scheduling.RequirementsReadOnly(scheduling.NewLabelRequirements(n.Node.Labels))
c.stateNodeLabelRequirements = &reqs
return reqs
}
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3668,7 +3668,7 @@ var _ = Context("Scheduling", func() {
},
},
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.DisableReservedCapacityFallback)
s, err := prov.NewScheduler(ctx, pods, nil, nil, scheduling.DisableReservedCapacityFallback)
Expect(err).To(BeNil())

var wg sync.WaitGroup
Expand Down Expand Up @@ -3742,7 +3742,7 @@ var _ = Context("Scheduling", func() {
},
},
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.DisableReservedCapacityFallback)
s, err := prov.NewScheduler(ctx, pods, nil, nil, scheduling.DisableReservedCapacityFallback)
Expect(err).To(BeNil())
_, err = s.Solve(injection.WithControllerName(ctx, "provisioner"), pods)
Expect(err).To(BeNil())
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Topology struct {
excludedPods sets.Set[string]
cluster *state.Cluster
stateNodes []*state.StateNode
cache *SimulationCache
}

func NewTopology(
Expand All @@ -70,6 +71,7 @@ func NewTopology(
nodePools []*v1.NodePool,
instanceTypes map[string][]*cloudprovider.InstanceType,
pods []*corev1.Pod,
cache *SimulationCache,
) (*Topology, error) {
t := &Topology{
kubeClient: kubeClient,
Expand All @@ -79,6 +81,7 @@ func NewTopology(
topologyGroups: map[uint64]*TopologyGroup{},
inverseTopologyGroups: map[uint64]*TopologyGroup{},
excludedPods: sets.New[string](),
cache: cache,
}

// these are the pods that we intend to schedule, so if they are currently in the cluster we shouldn't count them for
Expand Down Expand Up @@ -342,7 +345,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
continue
}
// ignore the node if it doesn't match the topology group
if !tg.nodeFilter.Matches(n.Node.Spec.Taints, scheduling.NewLabelRequirements(n.Node.Labels)) {
if !tg.nodeFilter.Matches(n.Node.Spec.Taints, t.cache.StateNodeLabelRequirements(n)) {
continue
}
domain, exists := n.Labels()[tg.Key]
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/topologynodefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func MakeTopologyNodeFilter(p *corev1.Pod, taintPolicy corev1.NodeInclusionPolic
}

// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology
func (t TopologyNodeFilter) Matches(taints []corev1.Taint, requirements scheduling.Requirements, compatibilityOptions ...option.Function[scheduling.CompatibilityOptions]) bool {
func (t TopologyNodeFilter) Matches(taints []corev1.Taint, requirements scheduling.RequirementsReadOnly, compatibilityOptions ...option.Function[scheduling.CompatibilityOptions]) bool {
matchesAffinity := true
if t.AffinityPolicy == corev1.NodeInclusionPolicyHonor {
matchesAffinity = t.matchesRequirements(requirements)
Expand All @@ -82,7 +82,7 @@ func (t TopologyNodeFilter) Matches(taints []corev1.Taint, requirements scheduli
// MatchesRequirements returns true if the TopologyNodeFilter doesn't prohibit a node with the requirements from
// participating in the topology. This method allows checking the requirements from a scheduling.NodeClaim to see if the
// node we will soon create participates in this topology.
func (t TopologyNodeFilter) matchesRequirements(requirements scheduling.Requirements, compatabilityOptions ...option.Function[scheduling.CompatibilityOptions]) bool {
func (t TopologyNodeFilter) matchesRequirements(requirements scheduling.RequirementsReadOnly, compatabilityOptions ...option.Function[scheduling.CompatibilityOptions]) bool {
// no requirements, so it always matches
if len(t.Requirements) == 0 || t.AffinityPolicy == corev1.NodeInclusionPolicyIgnore {
return true
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduling/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ import (
// types are slices and maps, this type should not be used as a pointer.
type Requirements map[string]*Requirement

// RequirementsReadOnly is an interface that should be accepted by consumers that only need to evaluate requirements and
// will not be changing them. This allows reuse of requirements safely.
type RequirementsReadOnly interface {
Compatible(requirements Requirements, options ...option.Function[CompatibilityOptions]) (errs error)
}

func NewRequirements(requirements ...*Requirement) Requirements {
r := Requirements{}
for _, requirement := range requirements {
Expand Down