Skip to content

Commit 79a702c

Browse files
committed
revert aggressive cleanup of live keeper version
1 parent 71a6e06 commit 79a702c

3 files changed

Lines changed: 72 additions & 1 deletion

File tree

internal/controller/keeper/commands.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var (
2929
type serverStatus struct {
3030
ServerState string
3131
Followers int
32+
Version string
3233
}
3334

3435
func getConnection(ctx context.Context, dialer controllerutil.DialContextFunc, hostname string, tlsRequired bool) (net.Conn, error) {
@@ -103,6 +104,13 @@ func queryKeeper(ctx context.Context, log controllerutil.Logger, conn net.Conn)
103104
ServerState: statMap["zk_server_state"],
104105
}
105106

107+
version, err := controllerutil.ParseVersion(statMap["zk_version"])
108+
if err != nil {
109+
log.Warn("failed to parse keeper version", "raw", statMap["zk_version"], "error", err)
110+
} else {
111+
result.Version = version
112+
}
113+
106114
if result.ServerState == "" {
107115
return serverStatus{}, fmt.Errorf("response missing required field 'Mode': %q", string(data))
108116
}

internal/controller/keeper/controller_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
. "github.com/onsi/ginkgo/v2"
1010
. "github.com/onsi/gomega"
1111
appsv1 "k8s.io/api/apps/v1"
12+
batchv1 "k8s.io/api/batch/v1"
1213
corev1 "k8s.io/api/core/v1"
1314
policyv1 "k8s.io/api/policy/v1"
15+
"k8s.io/apimachinery/pkg/api/meta"
1416
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1517
"k8s.io/apimachinery/pkg/runtime"
1618
"k8s.io/apimachinery/pkg/types"
@@ -112,6 +114,11 @@ var _ = When("reconciling standalone KeeperCluster resource", Ordered, func() {
112114
Expect(suite.Client.List(ctx, &statefulsets, listOpts)).To(Succeed())
113115
Expect(statefulsets.Items).To(HaveLen(1))
114116

117+
var jobs batchv1.JobList
118+
Expect(suite.Client.List(ctx, &jobs, listOpts)).To(Succeed())
119+
Expect(jobs.Items).To(BeEmpty())
120+
Expect(meta.FindStatusCondition(cr.Status.Conditions, v1.ConditionTypeVersionUpgraded)).To(BeNil())
121+
115122
testutil.AssertEvents(recorder.Events, map[string]int{
116123
"ReplicaCreated": 1,
117124
"ClusterNotReady": 1,

internal/controller/keeper/sync.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math"
88
"slices"
99
"strconv"
10+
"strings"
1011
"time"
1112

1213
"gopkg.in/yaml.v2"
@@ -92,7 +93,6 @@ type keeperReconciler struct {
9293
func (r *keeperReconciler) sync(ctx context.Context, log ctrlutil.Logger) (ctrl.Result, error) {
9394
log.Info("Enter Keeper Reconcile", "spec", r.Cluster.Spec, "status", r.Cluster.Status)
9495

95-
meta.RemoveStatusCondition(&r.Cluster.Status.Conditions, v1.ConditionTypeVersionInSync)
9696
meta.RemoveStatusCondition(&r.Cluster.Status.Conditions, v1.ConditionTypeVersionUpgraded)
9797

9898
r.SetUnknownConditions(v1.ConditionReasonStepFailed, "Reconcile stopped before condition evaluation",
@@ -101,6 +101,7 @@ func (r *keeperReconciler) sync(ctx context.Context, log ctrlutil.Logger) (ctrl.
101101
v1.ConditionTypeHealthy,
102102
v1.ConditionTypeClusterSizeAligned,
103103
v1.ConditionTypeConfigurationInSync,
104+
v1.ConditionTypeVersionInSync,
104105
v1.ConditionTypeReady,
105106
v1.KeeperConditionTypeScaleAllowed,
106107
})
@@ -565,6 +566,7 @@ func (r *keeperReconciler) evaluateReplicaConditions() {
565566
var errorIDs, notReadyIDs, notUpdatedIDs []string
566567

567568
replicasByMode := map[string][]v1.KeeperReplicaID{}
569+
replicaVersions := map[string]string{}
568570

569571
r.Cluster.Status.ReadyReplicas = 0
570572
for id, replica := range r.ReplicaState {
@@ -584,11 +586,17 @@ func (r *keeperReconciler) evaluateReplicaConditions() {
584586
if replica.HasDiff(r.revs) || !replica.Updated() {
585587
notUpdatedIDs = append(notUpdatedIDs, idStr)
586588
}
589+
590+
if replica.Status.Version != "" {
591+
replicaVersions[idStr] = replica.Status.Version
592+
}
587593
}
588594

589595
r.SetCondition(chctrl.ReplicaStartupCondition(errorIDs))
590596
r.SetCondition(chctrl.HealthyCondition(notReadyIDs))
591597
r.SetCondition(chctrl.ConfigSyncCondition(nil, notUpdatedIDs, nil))
598+
versionCond, versionEvents := keeperVersionSyncCondition(replicaVersions, len(notUpdatedIDs) > 0)
599+
r.SetCondition(versionCond, versionEvents...)
592600

593601
// Ready condition — keeper-specific logic.
594602
exists := len(r.ReplicaState)
@@ -667,6 +675,54 @@ func (r *keeperReconciler) evaluateReplicaConditions() {
667675
)
668676
}
669677

678+
func keeperVersionSyncCondition(replicaVersions map[string]string, isUpdating bool) (metav1.Condition, []chctrl.EventSpec) {
679+
newCond := func(status metav1.ConditionStatus, reason v1.ConditionReason, message string) metav1.Condition {
680+
return metav1.Condition{
681+
Type: v1.ConditionTypeVersionInSync,
682+
Status: status,
683+
Reason: reason,
684+
Message: message,
685+
}
686+
}
687+
688+
if len(replicaVersions) == 0 {
689+
return newCond(metav1.ConditionUnknown, v1.ConditionReasonVersionPending, "No Keeper replica version has been observed yet"), nil
690+
}
691+
692+
versions := map[string]struct{}{}
693+
var observed []string
694+
for id, version := range replicaVersions {
695+
versions[version] = struct{}{}
696+
observed = append(observed, fmt.Sprintf("%s: %s", id, version))
697+
}
698+
699+
if len(versions) == 1 {
700+
var version string
701+
for _, v := range replicaVersions {
702+
version = v
703+
break
704+
}
705+
706+
return newCond(metav1.ConditionTrue, v1.ConditionReasonVersionMatch,
707+
fmt.Sprintf("All observed Keeper replicas report version %s", version)), nil
708+
}
709+
710+
slices.Sort(observed)
711+
cond := newCond(metav1.ConditionFalse, v1.ConditionReasonVersionMismatch,
712+
fmt.Sprintf("Keeper replica versions differ: %s", strings.Join(observed, ", ")))
713+
714+
if isUpdating {
715+
return cond, nil
716+
}
717+
718+
return cond, []chctrl.EventSpec{{
719+
Type: corev1.EventTypeWarning,
720+
Reason: v1.EventReasonVersionDiverge,
721+
Action: v1.EventActionVersionCheck,
722+
Message: cond.Message,
723+
}}
724+
}
725+
670726
func (r *keeperReconciler) updateReplica(ctx context.Context, log ctrlutil.Logger, replicaID v1.KeeperReplicaID) (*ctrl.Result, error) {
671727
log = log.With("replica_id", replicaID)
672728
log.Info("updating replica")

0 commit comments

Comments
 (0)