Skip to content

Commit 6881438

Browse files
barkbaypebrc
andauthored
Ensure that the list of nodes scheduled for upgrade is consistent in case of version change (#2411) (#2416)
* Add version comparison to podsToUpgrade(..) Co-authored-by: Peter Brachwitz <[email protected]>
1 parent a18108d commit 6881438

File tree

6 files changed

+90
-47
lines changed

6 files changed

+90
-47
lines changed

pkg/controller/common/version/version.go

+5
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ func (v *Version) IsSameOrAfter(other Version) bool {
9393
(v.Major == other.Major && v.Minor == other.Minor && v.Patch >= other.Patch)
9494
}
9595

96+
// IsSame returns true if the receiver is the same version than the argument.
97+
func (v *Version) IsSame(other Version) bool {
98+
return v.Major == other.Major && v.Minor == other.Minor && v.Patch == other.Patch
99+
}
100+
96101
// Min returns the minimum version in vs or nil.
97102
func Min(vs []Version) *Version {
98103
sort.SliceStable(vs, func(i, j int) bool {

pkg/controller/elasticsearch/driver/fixtures.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package driver
66

77
import (
8+
appsv1 "k8s.io/api/apps/v1"
89
corev1 "k8s.io/api/core/v1"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
"k8s.io/apimachinery/pkg/runtime"
@@ -24,9 +25,7 @@ const (
2425
)
2526

2627
type testPod struct {
27-
name string
28-
version string
29-
ssetName string
28+
name, version, revision, ssetName string
3029
master, data, healthy, toUpgrade, inCluster, terminating bool
3130
uid types.UID
3231
}
@@ -45,6 +44,7 @@ func (t testPod) isHealthy(v bool) testPod { t.healthy = v; return
4544
func (t testPod) needsUpgrade(v bool) testPod { t.toUpgrade = v; return t }
4645
func (t testPod) isTerminating(v bool) testPod { t.terminating = v; return t }
4746
func (t testPod) withVersion(v string) testPod { t.version = v; return t }
47+
func (t testPod) withRevision(v string) testPod { t.revision = v; return t }
4848
func (t testPod) inStatefulset(ssetName string) testPod { t.ssetName = ssetName; return t } //nolint:unparam
4949

5050
// filter to simulate a Pod that has been removed while upgrading
@@ -243,6 +243,7 @@ func (t testPod) toPod() corev1.Pod {
243243
labels := map[string]string{}
244244
labels[label.VersionLabelName] = t.version
245245
labels[label.ClusterNameLabelName] = TestEsName
246+
labels[appsv1.StatefulSetRevisionLabel] = t.revision
246247
label.NodeTypesMasterLabelName.Set(t.master, labels)
247248
label.NodeTypesDataLabelName.Set(t.data, labels)
248249
labels[label.StatefulSetNameLabelName] = t.ssetName

pkg/controller/elasticsearch/driver/nodes.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package driver
77
import (
88
"fmt"
99

10+
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
1011
"github.com/elastic/cloud-on-k8s/pkg/controller/common/events"
1112
"github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore"
1213
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
@@ -148,7 +149,7 @@ func (d *defaultDriver) reconcileNodeSpecs(
148149

149150
// When not reconciled, set the phase to ApplyingChanges only if it was Ready to avoid to
150151
// override another "not Ready" phase like MigratingData.
151-
if Reconciled(expectedResources.StatefulSets(), actualStatefulSets, d.Client) {
152+
if Reconciled(d.ES, expectedResources.StatefulSets(), actualStatefulSets, d.Client) {
152153
reconcileState.UpdateElasticsearchReady(resourcesState, observedState)
153154
} else if reconcileState.IsElasticsearchReady(observedState) {
154155
reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods)
@@ -163,7 +164,7 @@ func (d *defaultDriver) reconcileNodeSpecs(
163164
// Reconciled reports whether the actual StatefulSets are reconciled to match the expected StatefulSets
164165
// by checking that the expected template hash label is reconciled for all StatefulSets, there are no
165166
// pod upgrades in progress and all pods are running.
166-
func Reconciled(expectedStatefulSets, actualStatefulSets sset.StatefulSetList, client k8s.Client) bool {
167+
func Reconciled(es esv1.Elasticsearch, expectedStatefulSets, actualStatefulSets sset.StatefulSetList, client k8s.Client) bool {
167168
// actual sset should have the expected sset template hash label
168169
for _, expectedSset := range expectedStatefulSets {
169170
actualSset, ok := actualStatefulSets.GetByName(expectedSset.Name)
@@ -178,7 +179,7 @@ func Reconciled(expectedStatefulSets, actualStatefulSets sset.StatefulSetList, c
178179
}
179180

180181
// all pods should have been upgraded
181-
pods, err := podsToUpgrade(client, actualStatefulSets)
182+
pods, err := podsToUpgrade(es, client, actualStatefulSets)
182183
if err != nil {
183184
return false
184185
}

pkg/controller/elasticsearch/driver/upgrade.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
1111
"github.com/elastic/cloud-on-k8s/pkg/controller/common/expectations"
1212
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
13+
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
1314
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
15+
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
1416
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/reconcile"
1517
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
1618
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
@@ -42,7 +44,7 @@ func (d *defaultDriver) handleRollingUpgrades(
4244
if err != nil {
4345
return results.WithError(err)
4446
}
45-
podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
47+
podsToUpgrade, err := podsToUpgrade(d.ES, d.Client, statefulSets)
4648
if err != nil {
4749
return results.WithError(err)
4850
}
@@ -169,9 +171,15 @@ func healthyPods(
169171
}
170172

171173
func podsToUpgrade(
174+
es esv1.Elasticsearch,
172175
client k8s.Client,
173176
statefulSets sset.StatefulSetList,
174177
) ([]corev1.Pod, error) {
178+
esVersion, err := version.Parse(es.Spec.Version)
179+
if err != nil {
180+
return nil, err
181+
}
182+
175183
var toUpgrade []corev1.Pod
176184
for _, statefulSet := range statefulSets {
177185
if statefulSet.Status.UpdateRevision == "" {
@@ -194,7 +202,16 @@ func podsToUpgrade(
194202
// Pod does not exist, continue the loop as the absence will be accounted by the deletion driver
195203
continue
196204
}
197-
if sset.PodRevision(pod) != statefulSet.Status.UpdateRevision {
205+
// We consider a Pod for an upgrade if at least one of the following conditions is met:
206+
// 1. The update revision of the Pod does not match the one in the status of the StatefulSet
207+
// 2. The Elasticsearch version run by the Pod does not match the expected one in the Elasticsearch object
208+
// Relying only on Pod revision is not enough since it might not be propagated consistently across all the StatefulSets.
209+
// See https://github.com/elastic/cloud-on-k8s/issues/2393#issuecomment-572951884
210+
podVersion, err := label.ExtractVersion(pod.Labels)
211+
if err != nil {
212+
return toUpgrade, err
213+
}
214+
if sset.PodRevision(pod) != statefulSet.Status.UpdateRevision || !podVersion.IsSame(*esVersion) {
198215
toUpgrade = append(toUpgrade, pod)
199216
}
200217
}

pkg/controller/elasticsearch/driver/upgrade_forced.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@ package driver
77
import (
88
"fmt"
99

10-
corev1 "k8s.io/api/core/v1"
11-
1210
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
1311
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
1412
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
1513
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
14+
corev1 "k8s.io/api/core/v1"
1615
)
1716

1817
func (d *defaultDriver) MaybeForceUpgrade(statefulSets sset.StatefulSetList) (bool, error) {
1918
// Get the pods to upgrade
20-
podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
19+
podsToUpgrade, err := podsToUpgrade(d.ES, d.Client, statefulSets)
2120
if err != nil {
2221
return false, err
2322
}

pkg/controller/elasticsearch/driver/upgrade_test.go

+56-36
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,23 @@ package driver
77
import (
88
"testing"
99

10+
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
1011
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
1112
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
1213
"github.com/stretchr/testify/assert"
1314
appsv1 "k8s.io/api/apps/v1"
14-
corev1 "k8s.io/api/core/v1"
15-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16-
"k8s.io/apimachinery/pkg/runtime"
1715
)
1816

19-
const testNamespace = "ns"
20-
21-
func podWithRevision(name, revision string) *corev1.Pod {
22-
return &corev1.Pod{
23-
ObjectMeta: metav1.ObjectMeta{
24-
Name: name,
25-
Namespace: testNamespace,
26-
Labels: map[string]string{appsv1.StatefulSetRevisionLabel: revision},
17+
func Test_podsToUpgrade(t *testing.T) {
18+
defaultEs := esv1.Elasticsearch{
19+
Spec: esv1.ElasticsearchSpec{
20+
Version: "7.1.0",
2721
},
2822
}
29-
}
30-
31-
func Test_podsToUpgrade(t *testing.T) {
3223
type args struct {
33-
pods []runtime.Object
24+
pods upgradeTestPods
3425
statefulSets sset.StatefulSetList
26+
es esv1.Elasticsearch
3527
}
3628
tests := []struct {
3729
name string
@@ -52,13 +44,14 @@ func Test_podsToUpgrade(t *testing.T) {
5244
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-a", UpdateRevision: "rev-b", UpdatedReplicas: 0, Replicas: 3},
5345
}.Build(),
5446
},
55-
pods: []runtime.Object{
56-
podWithRevision("masters-0", "rev-a"),
57-
podWithRevision("masters-1", "rev-a"),
58-
podWithRevision("nodes-0", "rev-a"),
59-
podWithRevision("nodes-1", "rev-a"),
60-
podWithRevision("nodes-2", "rev-a"),
61-
},
47+
pods: newUpgradeTestPods(
48+
newTestPod("masters-0").withRevision("rev-a").withVersion("7.1.0"),
49+
newTestPod("masters-1").withRevision("rev-a").withVersion("7.1.0"),
50+
newTestPod("nodes-0").withRevision("rev-a").withVersion("7.1.0"),
51+
newTestPod("nodes-1").withRevision("rev-a").withVersion("7.1.0"),
52+
newTestPod("nodes-2").withRevision("rev-a").withVersion("7.1.0"),
53+
),
54+
es: defaultEs,
6255
},
6356
want: []string{"masters-0", "masters-1", "nodes-0", "nodes-1", "nodes-2"},
6457
},
@@ -75,10 +68,11 @@ func Test_podsToUpgrade(t *testing.T) {
7568
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-b", UpdateRevision: "rev-b", UpdatedReplicas: 3, Replicas: 3},
7669
}.Build(),
7770
},
78-
pods: []runtime.Object{
79-
podWithRevision("masters-0", "rev-a"),
80-
podWithRevision("masters-1", "rev-a"),
81-
},
71+
pods: newUpgradeTestPods(
72+
newTestPod("masters-0").withRevision("rev-a").withVersion("7.1.0"),
73+
newTestPod("masters-1").withRevision("rev-a").withVersion("7.1.0"),
74+
),
75+
es: defaultEs,
8276
},
8377
want: []string{"masters-0", "masters-1"},
8478
},
@@ -95,10 +89,11 @@ func Test_podsToUpgrade(t *testing.T) {
9589
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-b", UpdateRevision: "", UpdatedReplicas: 3, Replicas: 3},
9690
}.Build(),
9791
},
98-
pods: []runtime.Object{
99-
podWithRevision("masters-0", "rev-a"),
100-
podWithRevision("masters-1", "rev-a"),
101-
},
92+
pods: newUpgradeTestPods(
93+
newTestPod("masters-0").withRevision("rev-a").withVersion("7.1.0"),
94+
newTestPod("masters-1").withRevision("rev-a").withVersion("7.1.0"),
95+
),
96+
es: defaultEs,
10297
},
10398
want: []string{},
10499
},
@@ -115,18 +110,43 @@ func Test_podsToUpgrade(t *testing.T) {
115110
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-b", UpdateRevision: "rev-b", UpdatedReplicas: 3, Replicas: 3},
116111
}.Build(),
117112
},
118-
pods: []runtime.Object{
119-
podWithRevision("masters-0", "rev-b"),
120-
podWithRevision("masters-1", "rev-a"),
121-
},
113+
pods: newUpgradeTestPods(
114+
newTestPod("masters-0").withRevision("rev-b").withVersion("7.1.0"),
115+
newTestPod("masters-1").withRevision("rev-a").withVersion("7.1.0"),
116+
),
117+
es: defaultEs,
122118
},
123119
want: []string{"masters-1"},
124120
},
121+
{
122+
name: "StatefulSet has been updated with a new ES version but StatefulSet update revision is not yet up to date",
123+
args: args{
124+
statefulSets: sset.StatefulSetList{
125+
sset.TestSset{
126+
Name: "masters", Replicas: 2, Master: true, Data: false,
127+
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-master-a", UpdateRevision: "rev-master-b", UpdatedReplicas: 0, Replicas: 2},
128+
}.Build(),
129+
sset.TestSset{
130+
Name: "nodes", Replicas: 3, Master: false, Data: true,
131+
Status: appsv1.StatefulSetStatus{CurrentRevision: "rev-nodes-a", UpdateRevision: "rev-nodes-a", UpdatedReplicas: 0, Replicas: 3},
132+
}.Build(),
133+
},
134+
pods: newUpgradeTestPods(
135+
newTestPod("masters-0").withRevision("rev-master-a").withVersion("6.8.2"),
136+
newTestPod("masters-1").withRevision("rev-master-a").withVersion("6.8.2"),
137+
newTestPod("nodes-0").withRevision("rev-nodes-a").withVersion("6.8.2"),
138+
newTestPod("nodes-1").withRevision("rev-nodes-a").withVersion("6.8.2"),
139+
newTestPod("nodes-2").withRevision("rev-nodes-a").withVersion("6.8.2"),
140+
),
141+
es: defaultEs,
142+
},
143+
want: []string{"masters-0", "masters-1", "nodes-0", "nodes-1", "nodes-2"},
144+
},
125145
}
126146
for _, tt := range tests {
127147
t.Run(tt.name, func(t *testing.T) {
128-
client := k8s.WrappedFakeClient(tt.args.pods...)
129-
got, err := podsToUpgrade(client, tt.args.statefulSets)
148+
client := k8s.WrappedFakeClient(tt.args.pods.toRuntimeObjects(tt.args.es.Spec.Version, 1, nothing)...)
149+
got, err := podsToUpgrade(tt.args.es, client, tt.args.statefulSets)
130150
if (err != nil) != tt.wantErr {
131151
t.Errorf("podsToUpgrade() error = %v, wantErr %v", err, tt.wantErr)
132152
return

0 commit comments

Comments
 (0)