-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Expand file tree
/
Copy pathclustersnapshot.go
More file actions
167 lines (143 loc) · 9.04 KB
/
clustersnapshot.go
File metadata and controls
167 lines (143 loc) · 9.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clustersnapshot
import (
"errors"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerinterface "k8s.io/kube-scheduler/framework"
)
// Forkable is an interface for objects that can be forked, reverted and committed.
type Forkable interface {
// Fork creates a fork of the object state. All modifications can later be reverted to moment of forking via Revert().
// Use WithForkedSnapshot() helper function instead if possible.
Fork()
// Revert reverts the object state to moment of forking.
Revert()
// Commit commits changes done after forking.
Commit() error
}
// ClusterSnapshot is abstraction of cluster state used for predicate simulations.
// It exposes mutation methods and can be viewed as scheduler's SharedLister.
type ClusterSnapshot interface {
framework.SharedLister
ClusterSnapshotStore
Forkable
// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
// The provided draSnapshot and csiSnapshot are treated as the source of truth and are
// eagerly loaded into the internal NodeInfo and PodInfo objects.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot *drasnapshot.Snapshot, csiSnapshot *csisnapshot.Snapshot) error
// AddNodeInfo adds the given NodeInfo to the snapshot without checking scheduler predicates.
// The Node and the Pods are added, as well as any DRA and CSI objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot.
// The Node and the Pods are removed, as well as any DRA and CSI objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// The returned framework.NodeInfo is fully populated with DRA and CSI data.
// The internal NodeInfos obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.
GetNodeInfo(nodeName string) (*framework.NodeInfo, error)
// ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo.
ListNodeInfos() ([]*framework.NodeInfo, error)
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot without checking scheduler predicates.
// This method will allocate an internal PodInfo and populate it with existing claims pulled directly from the DRA snapshot.
// It assumes the caller has already populated the necessary DRA data into the DRA snapshot.
// It will not ReservePodClaims or compute new allocations.
ForceAddPod(pod *apiv1.Pod, nodeName string) error
// ForceRemovePod removes the given Pod from the snapshot.
// It will not UnreservePodClaims.
ForceRemovePod(namespace string, podName string, nodeName string) error
// SchedulePod tries to schedule the given Pod on the Node with the given name inside the snapshot,
// checking scheduling predicates. The pod is only scheduled if the predicates pass. If the pod is scheduled,
// all relevant DRA objects are modified to reflect that. Returns nil if the pod got scheduled, and a non-nil
// error explaining why not otherwise. The error Type() can be checked against SchedulingInternalError to distinguish
// failing predicates from unexpected errors.
SchedulePod(pod *apiv1.Pod, nodeName string) SchedulingError
// SchedulePodOnAnyNodeMatching tries to schedule the given Pod on any Node for which opts.IsNodeAcceptable returns
// true. Scheduling predicates are checked, and the pod is scheduled only if there is a matching Node with passing
// predicates. If the pod is scheduled, all relevant DRA objects are modified to reflect that, and the name of the
// Node its scheduled on and nil are returned. If the pod can't be scheduled on any Node, an empty string and a non-nil
// error explaining why are returned. The error Type() can be checked against SchedulingInternalError to distinguish
// failing predicates from unexpected errors.
SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, opts SchedulingOptions) (matchingNode string, err SchedulingError)
// UnschedulePod removes the given Pod from the given Node inside the snapshot, and modifies all relevant DRA objects
// to reflect the removal. The pod can then be scheduled on another Node in the snapshot using the Schedule methods.
UnschedulePod(namespace string, podName string, nodeName string) error
// CheckPredicates runs scheduler predicates to check if the given Pod would be able to schedule on the Node with the given
// name. Returns nil if predicates pass, or a non-nil error specifying why they didn't otherwise. The error Type() can be
// checked against SchedulingInternalError to distinguish failing predicates from unexpected errors. Doesn't mutate the snapshot.
CheckPredicates(pod *apiv1.Pod, nodeName string) SchedulingError
// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
DraSnapshot() *drasnapshot.Snapshot
// CsiSnapshot returns an interface that allows accessing and modifying the CSINode objects in the snapshot.
CsiSnapshot() *csisnapshot.Snapshot
// TODO(DRA): Move unschedulable Pods inside ClusterSnapshot (since their DRA objects are already here), refactor PodListProcessor.
}
// ClusterSnapshotStore is the "low-level" part of ClusterSnapshot, responsible for storing the snapshot state and mutating it directly,
// without going through scheduler predicates. ClusterSnapshotStore shouldn't be directly used outside the clustersnapshot pkg, its methods
// should be accessed via ClusterSnapshot.
type ClusterSnapshotStore interface {
schedulerinterface.SharedLister
Forkable
// FastPredicateLister returns an interface that allows querying internal state for fast predicate checking.
// The lister may return nil if the internal state is not available (fast predicates are disabled).
FastPredicateLister() FastPredicateLister
// StorePodInfo adds the given PodInfo to the Node with the given nodeName inside the snapshot.
StorePodInfo(podInfo *framework.PodInfo, nodeName string) error
// RemovePodInfo removes the given Pod from the snapshot.
RemovePodInfo(namespace string, podName string, nodeName string) error
// StoreNodeInfo adds the given *framework.NodeInfo to the snapshot without checking scheduler predicates.
// This shouldn't be used outside the clustersnapshot pkg, use ClusterSnapshot.AddNodeInfo() instead.
StoreNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given *framework.NodeInfo from the snapshot.
// This shouldn't be used outside the clustersnapshot pkg, use ClusterSnapshot.RemoveNodeInfo() instead.
RemoveNodeInfo(nodeName string) error
// Clear resets the snapshot to an empty, unforked state.
Clear()
}
// FastPredicateLister allows querying precomputed internal state for fast predicate checking.
type FastPredicateLister interface {
// PodAffinityCount returns the number of pods matching the given selector in the given topology domain.
PodAffinityCount(topologyKey, topologyValue string, selector labels.Selector) int
// TopologyValueCount returns the number of unique values for the given topology key in the snapshot.
TopologyValueCount(topologyKey string) int
// TopologyDomains returns all unique values for the given topology key in the snapshot.
TopologyDomains(topologyKey string) []string
}
// ErrNodeNotFound means that a node wasn't found in the snapshot.
var ErrNodeNotFound = errors.New("node not found")
// WithForkedSnapshot is a helper function for snapshot that makes sure all Fork() calls are closed with Commit() or Revert() calls.
// The function return (error, error) pair. The first error comes from the passed function, the second error indicate the success of the function itself.
func WithForkedSnapshot(snapshot ClusterSnapshot, f func() (bool, error)) (error, error) {
var commit bool
var err, cleanupErr error
snapshot.Fork()
defer func() {
if commit {
cleanupErr = snapshot.Commit()
if cleanupErr != nil {
klog.Errorf("Got error when calling ClusterSnapshot.Commit(), will try to revert; %v", cleanupErr)
}
}
if !commit || cleanupErr != nil {
snapshot.Revert()
}
}()
commit, err = f()
return err, cleanupErr
}