Skip to content

Commit 90b1ceb

Browse files
committed
Rolling update of kube-apiserver
Signed-off-by: Masayuki Ishii <masa213f@gmail.com>
1 parent 8f6c3a7 commit 90b1ceb

File tree

3 files changed

+631
-255
lines changed

3 files changed

+631
-255
lines changed

server/node_filter.go

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,17 @@ func (nf *NodeFilter) EtcdOutdatedMembers() (nodes []*cke.Node) {
314314
return nodes
315315
}
316316

317+
// HealthyAPIServer returns a control plane node running healthy API server.
318+
// If there is no healthy API server, it returns the first control plane node.
319+
func (nf *NodeFilter) HealthyAPIServer() *cke.Node {
320+
for _, n := range nf.ControlPlaneNodes() {
321+
if nf.nodeStatus(n).APIServer.IsHealthy {
322+
return n
323+
}
324+
}
325+
return nil
326+
}
327+
317328
// APIServerStopped filters nodes that are not running API server.
318329
func (nf *NodeFilter) APIServerStopped(targets []*cke.Node) (nodes []*cke.Node) {
319330
for _, n := range targets {
@@ -324,6 +335,26 @@ func (nf *NodeFilter) APIServerStopped(targets []*cke.Node) (nodes []*cke.Node)
324335
return nodes
325336
}
326337

338+
// HealthyAPIServerNodes returns nodes which have healthy API servers
339+
func (nf *NodeFilter) APIServerUnhealthy(targets []*cke.Node) (nodes []*cke.Node) {
340+
for _, n := range targets {
341+
if !nf.nodeStatus(n).APIServer.IsHealthy { //
342+
nodes = append(nodes, n)
343+
}
344+
}
345+
return nodes
346+
}
347+
348+
// UnhealthyAPIServerNodes returns nodes which have unhealthy API servers
349+
func (nf *NodeFilter) APIServerHealthy(targets []*cke.Node) (nodes []*cke.Node) {
350+
for _, n := range targets {
351+
if nf.nodeStatus(n).APIServer.IsHealthy {
352+
nodes = append(nodes, n)
353+
}
354+
}
355+
return nodes
356+
}
357+
327358
// APIServerOutdated filters nodes that are running API server with outdated image or params.
328359
func (nf *NodeFilter) APIServerOutdated(targets []*cke.Node) (nodes []*cke.Node) {
329360
currentExtra := nf.cluster.Options.APIServer
@@ -618,39 +649,6 @@ func (nf *NodeFilter) ProxyOutdated(targets []*cke.Node, params cke.ProxyParams)
618649
return nodes
619650
}
620651

621-
// HealthyAPIServer returns a control plane node running healthy API server.
622-
// If there is no healthy API server, it returns the first control plane node.
623-
func (nf *NodeFilter) HealthyAPIServer() *cke.Node {
624-
var node *cke.Node
625-
for _, n := range nf.ControlPlaneNodes() {
626-
node = n
627-
if nf.nodeStatus(n).APIServer.IsHealthy {
628-
break
629-
}
630-
}
631-
return node
632-
}
633-
634-
// HealthyAPIServerNodes returns nodes which have healthy API servers
635-
func (nf *NodeFilter) HealthyAPIServerNodes() (nodes []*cke.Node) {
636-
for _, n := range nf.ControlPlaneNodes() {
637-
if nf.nodeStatus(n).APIServer.IsHealthy {
638-
nodes = append(nodes, n)
639-
}
640-
}
641-
return nodes
642-
}
643-
644-
// UnhealthyAPIServerNodes returns nodes which have unhealthy API servers
645-
func (nf *NodeFilter) UnhealthyAPIServerNodes() (nodes []*cke.Node) {
646-
for _, n := range nf.ControlPlaneNodes() {
647-
if !nf.nodeStatus(n).APIServer.IsHealthy {
648-
nodes = append(nodes, n)
649-
}
650-
}
651-
return nodes
652-
}
653-
654652
func isInternal(name string, n *cke.Node) bool {
655653
if name == op.CKEAnnotationReboot {
656654
return false

server/strategy.go

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,61 @@ func riversOps(c *cke.Cluster, nf *NodeFilter, maxConcurrentUpdates int) (ops []
140140
return ops
141141
}
142142

143-
func k8sOps(c *cke.Cluster, nf *NodeFilter, cs *cke.ClusterStatus, maxConcurrentUpdates int) (ops []cke.Operator) {
144-
// For cp nodes
145-
if nodes := nf.SSHConnected(nf.APIServerStopped(nf.ControlPlaneNodes())); len(nodes) > 0 {
143+
func apiserverOps(c *cke.Cluster, nf *NodeFilter, cs *cke.ClusterStatus, maxConcurrentUpdates int) (ops []cke.Operator, skipOtherOps bool) {
144+
// First, do the following operations together.
145+
// - Starting stopped kube-apiservers. (for bootstrapping the Kubernetes cluster or for rebooting controle plane nodes)
146+
// - Updating outdated and unhealhy apiservers. (for repairing configuration errors)
147+
var nodes []*cke.Node
148+
nodes = append(nodes, nf.SSHConnected(nf.APIServerStopped(nf.ControlPlaneNodes()))...)
149+
nodes = append(nodes, nf.SSHConnected(nf.APIServerOutdated(nf.APIServerUnhealthy(nf.ControlPlaneNodes())))...)
150+
if len(nodes) > 0 {
151+
// Set the unhealthy kube-apiservers to NotReady.
152+
// NOTE: This step should be skipped during bootstraping.
153+
if nf.HealthyAPIServer() != nil {
154+
ops = append(ops, masterEndpointOps(c, cs, nf, nil)...)
155+
}
146156
kubeletConfig := k8s.GenerateKubeletConfiguration(c.Options.Kubelet, "0.0.0.0", nil)
147157
ops = append(ops, k8s.APIServerRestartOp(nodes, c.ServiceSubnet, c.Options.APIServer, kubeletConfig.ClusterDomain))
148158
}
159+
if len(ops) > 0 {
160+
return ops, true
161+
}
162+
163+
// Waiting for all kube-apiservers to be healthy. Because...
164+
// - Updating kube-apiservers should be performed only when all kube-apiservers are healthy.
165+
// - Other k8s components should be maintained only when all kube-apiservers are *updated* and healthy, to ensure the upgrade order.
166+
// See the version skew policy: https://kubernetes.io/releases/version-skew-policy/
167+
if len(nf.APIServerUnhealthy(nf.ControlPlaneNodes())) > 0 {
168+
// Set the unhealthy kube-apiservers to NotReady.
169+
if nf.HealthyAPIServer() != nil {
170+
ops = append(ops, masterEndpointOps(c, cs, nf, nil)...)
171+
}
172+
return ops, true
173+
}
174+
175+
// Updating kube-apiservers one by one.
149176
if nodes := nf.SSHConnected(nf.APIServerOutdated(nf.ControlPlaneNodes())); len(nodes) > 0 {
177+
target := nodes[0] // just one
178+
ops = append(ops, masterEndpointOps(c, cs, nf, []string{target.Address})...)
150179
kubeletConfig := k8s.GenerateKubeletConfiguration(c.Options.Kubelet, "0.0.0.0", nil)
151-
ops = append(ops, k8s.APIServerRestartOp(nodes, c.ServiceSubnet, c.Options.APIServer, kubeletConfig.ClusterDomain))
180+
ops = append(ops, k8s.APIServerRestartOp([]*cke.Node{target}, c.ServiceSubnet, c.Options.APIServer, kubeletConfig.ClusterDomain))
181+
return ops, true
152182
}
183+
184+
// Set kube-apiservers to Ready.
185+
// If a control plane node is about to reboot, set it to NotReady.
186+
ops = append(ops, masterEndpointOps(c, cs, nf, rebootNextCandidates(cs))...)
187+
return ops, false
188+
}
189+
190+
func k8sOps(c *cke.Cluster, nf *NodeFilter, cs *cke.ClusterStatus, maxConcurrentUpdates int) (ops []cke.Operator) {
191+
apiserverOps, skipOtherOps := apiserverOps(c, nf, cs, maxConcurrentUpdates)
192+
if skipOtherOps {
193+
return apiserverOps
194+
}
195+
ops = append(ops, apiserverOps...)
196+
197+
// Other CP components
153198
if nodes := nf.SSHConnected(nf.ControllerManagerStopped(nf.ControlPlaneNodes())); len(nodes) > 0 {
154199
ops = append(ops, k8s.ControllerManagerBootOp(nodes, c.Name, c.ServiceSubnet, c.Options.ControllerManager))
155200
}
@@ -266,9 +311,7 @@ func k8sMaintOps(c *cke.Cluster, cs *cke.ClusterStatus, resources []cke.Resource
266311

267312
ops = append(ops, decideNodeDNSOps(apiServer, c, ks)...)
268313

269-
ops = append(ops, masterEndpointOps(c, cs, nf)...)
270-
271-
ops = append(ops, etcdEndpointOps(c, cs, nf)...)
314+
ops = append(ops, etcdEndpointOps(c, cs, nf, rebootNextCandidates(cs))...)
272315

273316
if nodes := nf.OutdatedAttrsNodes(); len(nodes) > 0 {
274317
ops = append(ops, op.KubeNodeUpdateOp(apiServer, nodes))
@@ -347,17 +390,17 @@ type endpointParams struct {
347390
serviceName string
348391
}
349392

350-
func masterEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter) []cke.Operator {
393+
func masterEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter, markedAsNotReadyIPs []string) []cke.Operator {
351394
var readyIPs, notReadyIPs []string
352395

353-
for _, n := range nf.HealthyAPIServerNodes() {
354-
if cs.RebootQueue.Enabled && (rebootProcessing(cs, n.Address) || rebootNextCandidate(cs, n.Address)) {
396+
for _, n := range nf.APIServerHealthy(nf.ControlPlaneNodes()) {
397+
if rebootProcessing(cs, n.Address) || slices.Contains(markedAsNotReadyIPs, n.Address) {
355398
notReadyIPs = append(notReadyIPs, n.Address)
356399
} else {
357400
readyIPs = append(readyIPs, n.Address)
358401
}
359402
}
360-
for _, n := range nf.UnhealthyAPIServerNodes() {
403+
for _, n := range nf.APIServerUnhealthy(nf.ControlPlaneNodes()) {
361404
notReadyIPs = append(notReadyIPs, n.Address)
362405
}
363406

@@ -373,7 +416,7 @@ func masterEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter) []
373416
return decideEpEpsOps(ep, cs.Kubernetes.MasterEndpoints, cs.Kubernetes.MasterEndpointSlice, nf.HealthyAPIServer())
374417
}
375418

376-
func etcdEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter) (ops []cke.Operator) {
419+
func etcdEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter, markedAsNotReadyIPs []string) (ops []cke.Operator) {
377420
// Endpoints needs a corresponding Service.
378421
// If an Endpoints lacks such a Service, it will be removed.
379422
// https://github.com/kubernetes/kubernetes/blob/b7c2d923ef4e166b9572d3aa09ca72231b59b28b/pkg/controller/endpoint/endpoints_controller.go#L392-L397
@@ -384,7 +427,7 @@ func etcdEndpointOps(c *cke.Cluster, cs *cke.ClusterStatus, nf *NodeFilter) (ops
384427

385428
var readyIPs, notReadyIPs []string
386429
for _, n := range nf.ControlPlaneNodes() {
387-
if cs.RebootQueue.Enabled && (rebootProcessing(cs, n.Address) || rebootNextCandidate(cs, n.Address)) {
430+
if rebootProcessing(cs, n.Address) || slices.Contains(markedAsNotReadyIPs, n.Address) {
388431
notReadyIPs = append(notReadyIPs, n.Address)
389432
} else {
390433
readyIPs = append(readyIPs, n.Address)
@@ -759,7 +802,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
759802

760803
rebootingApiServers := make(map[string]bool)
761804
for _, cp := range nf.ControlPlaneNodes() {
762-
if cs.RebootQueue.Enabled && rebootProcessing(cs, cp.Nodename()) {
805+
if rebootProcessing(cs, cp.Nodename()) {
763806
rebootingApiServers[cp.Address] = true
764807
}
765808
}
@@ -911,7 +954,7 @@ func rebootUncordonOp(cs *cke.ClusterStatus, nf *NodeFilter) cke.Operator {
911954
}
912955
nodes := make([]string, 0, len(attrNodes))
913956
for _, n := range attrNodes {
914-
if (cs.RebootQueue.Enabled && rebootProcessing(cs, n.Name)) || repairProcessing(cs, n.Name) {
957+
if rebootProcessing(cs, n.Name) || repairProcessing(cs, n.Name) {
915958
continue
916959
}
917960
nodes = append(nodes, n.Name)
@@ -922,8 +965,23 @@ func rebootUncordonOp(cs *cke.ClusterStatus, nf *NodeFilter) cke.Operator {
922965
return op.RebootUncordonOp(nf.HealthyAPIServer(), nodes)
923966
}
924967

925-
// NOTE: This function does not check whether the reboot queue is enabled or not.
968+
func rebootNextCandidates(cs *cke.ClusterStatus) []string {
969+
if !cs.RebootQueue.Enabled {
970+
return nil
971+
}
972+
973+
ret := []string{}
974+
for _, entry := range cs.RebootQueue.NextCandidates {
975+
ret = append(ret, entry.Node)
976+
}
977+
return ret
978+
}
979+
926980
func rebootProcessing(cs *cke.ClusterStatus, node string) bool {
981+
if !cs.RebootQueue.Enabled {
982+
return false
983+
}
984+
927985
for _, entry := range cs.RebootQueue.Entries {
928986
if entry.Node != node {
929987
continue
@@ -938,16 +996,6 @@ func rebootProcessing(cs *cke.ClusterStatus, node string) bool {
938996
return false
939997
}
940998

941-
// NOTE: This function does not check whether the reboot queue is enabled or not.
942-
func rebootNextCandidate(cs *cke.ClusterStatus, node string) bool {
943-
for _, entry := range cs.RebootQueue.NextCandidates {
944-
if entry.Node == node {
945-
return true
946-
}
947-
}
948-
return false
949-
}
950-
951999
func repairProcessing(cs *cke.ClusterStatus, nodename string) bool {
9521000
for _, entry := range cs.RepairQueue.Entries {
9531001
if entry.IsInCluster() && entry.Nodename == nodename &&

0 commit comments

Comments
 (0)