diff --git a/api/v1alpha1/keepercluster_types.go b/api/v1alpha1/keepercluster_types.go index 504281d4..af01293a 100644 --- a/api/v1alpha1/keepercluster_types.go +++ b/api/v1alpha1/keepercluster_types.go @@ -171,12 +171,11 @@ type KeeperReplica struct { Ready bool `json:"ready"` // Error indicates that replica has a persistent error causing Pod startup failure. Error bool `json:"error"` + // Updated indicates that replica Pod is updated to the latest StatefulSet revision. + Updated bool `json:"updated"` // Mode indicates replica role, during latest reconcile. // +optional Mode string `json:"mode"` - // LastUpdate indicates latest status update time. - // +optional - LastUpdate metav1.Time `json:"lastUpdate,omitempty"` } // KeeperClusterStatus defines the observed state of KeeperCluster. @@ -202,6 +201,8 @@ type KeeperClusterStatus struct { CurrentRevision string `json:"currentRevision,omitempty"` // CurrentRevision indicates latest requested KeeperCluster spec revision. UpdateRevision string `json:"updateRevision,omitempty"` + // ObservedGeneration indicates lastest generation observed by controller. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +genclient diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 95176edf..9e474f79 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -169,7 +169,7 @@ func (in *KeeperClusterStatus) DeepCopyInto(out *KeeperClusterStatus) { in, out := &in.Replicas, &out.Replicas *out = make(map[string]KeeperReplica, len(*in)) for key, val := range *in { - (*out)[key] = *val.DeepCopy() + (*out)[key] = val } } } @@ -187,7 +187,6 @@ func (in *KeeperClusterStatus) DeepCopy() *KeeperClusterStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KeeperReplica) DeepCopyInto(out *KeeperReplica) { *out = *in - in.LastUpdate.DeepCopyInto(&out.LastUpdate) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KeeperReplica. diff --git a/config/crd/bases/clickhouse.com_keeperclusters.yaml b/config/crd/bases/clickhouse.com_keeperclusters.yaml index daa2cbcc..d5138012 100644 --- a/config/crd/bases/clickhouse.com_keeperclusters.yaml +++ b/config/crd/bases/clickhouse.com_keeperclusters.yaml @@ -1653,6 +1653,11 @@ spec: description: CurrentRevision indicates latest applied KeeperCluster spec revision. type: string + observedGeneration: + description: ObservedGeneration indicates lastest generation observed + by controller. + format: int64 + type: integer readyReplicas: description: ReadyReplicas Total number of replicas ready to server requests. @@ -1665,10 +1670,6 @@ spec: description: Error indicates that replica has a persistent error causing Pod startup failure. type: boolean - lastUpdate: - description: LastUpdate indicates latest status update time. - format: date-time - type: string mode: description: Mode indicates replica role, during latest reconcile. type: string @@ -1676,9 +1677,14 @@ spec: description: Ready indicates that replica is ready to accept incoming connections. type: boolean + updated: + description: Updated indicates that replica Pod is updated to + the latest StatefulSet revision. + type: boolean required: - error - ready + - updated type: object description: Replicas Current replica state by Keeper ID. type: object diff --git a/config/samples/v1alpha1_keepercluster.yaml b/config/samples/v1alpha1_keepercluster.yaml index e19bc0bf..8692ca50 100644 --- a/config/samples/v1alpha1_keepercluster.yaml +++ b/config/samples/v1alpha1_keepercluster.yaml @@ -6,7 +6,12 @@ metadata: app.kubernetes.io/managed-by: kustomize name: test-keeper-cluster spec: - replicas: 5 + replicas: 3 + image: + repository: "docker.io/clickhouse/clickhouse-keeper" + tag: latest + loggerConfig: + loggerLevel: warning storage: accessModes: - ReadWriteOncePod diff --git a/internal/controller/keeper/sync.go b/internal/controller/keeper/sync.go index 56b7ee69..5260c032 100644 --- a/internal/controller/keeper/sync.go +++ b/internal/controller/keeper/sync.go @@ -69,12 +69,21 @@ func (ctx *reconcileContext) HasConfigMapDiff(replicaID string) bool { return util.GetConfigHashFromObject(sts) != ctx.KeeperCluster.Status.ConfigurationRevision } +func (ctx *reconcileContext) With(keysAndVals ...interface{}) reconcileContext { + return reconcileContext{ + KeeperCluster: ctx.KeeperCluster, + Context: ctx.Context, + Log: ctx.Log.With(keysAndVals...), + stsByReplicaID: ctx.stsByReplicaID, + } +} + type ReconcileFunc func(reconcileContext) (*ctrl.Result, error) func (r *ClusterReconciler) Sync(ctx context.Context, log util.Logger, cr *v1.KeeperCluster) (ctrl.Result, error) { log.Info("Enter Keeper Reconcile", "spec", cr.Spec, "status", cr.Status) - reconcileContext := reconcileContext{ + recCtx := reconcileContext{ KeeperCluster: cr, Context: ctx, Log: log, @@ -95,54 +104,49 @@ func (r *ClusterReconciler) Sync(ctx context.Context, log util.Logger, cr *v1.Ke var result ctrl.Result for _, fn := range reconcileSteps { funcName := strings.TrimPrefix(util.GetFunctionName(fn), "reconcile") - reconcileContext.Log = log.With("reconcile_step", funcName) - reconcileContext.Log.Debug("starting reconcile step") + stepCtx := recCtx.With("reconcile_step", funcName) + stepCtx.Log.Debug("starting reconcile step") - stepResult, err := fn(reconcileContext) + stepResult, err := fn(recCtx) if err != nil { if k8serrors.IsConflict(err) { - reconcileContext.Log.Error(err, "update conflict for resource, reschedule to retry") + stepCtx.Log.Error(err, "update conflict for resource, reschedule to retry") // retry immediately, as just the update to the CR failed return ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil } if k8serrors.IsAlreadyExists(err) { - reconcileContext.Log.Error(err, "create already existed resource, reschedule to retry") + stepCtx.Log.Error(err, "create already existed resource, reschedule to retry") // retry immediately, as just creating already existed resource return ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil } - reconcileContext.Log.Error(err, "unexpected error, setting conditions to unknown and rescheduling reconciliation to try again") + stepCtx.Log.Error(err, "unexpected error, setting conditions to unknown and rescheduling reconciliation to try again") errMsg := "Reconcile returned error" - setConditions(reconcileContext, []metav1.Condition{ - reconcileContext.NewCondition(v1.KeeperConditionTypeReconcileSucceeded, metav1.ConditionFalse, v1.KeeperConditionReasonStepFailed, errMsg), + setConditions(recCtx, []metav1.Condition{ + recCtx.NewCondition(v1.KeeperConditionTypeReconcileSucceeded, metav1.ConditionFalse, v1.KeeperConditionReasonStepFailed, errMsg), // Operator did not finish reconciliation, some conditions may not be valid already. - reconcileContext.NewCondition(v1.KeeperConditionTypeReady, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), - reconcileContext.NewCondition(v1.KeeperConditionTypeDegraded, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), - reconcileContext.NewCondition(v1.KeeperConditionTypeReplicaStartupFailure, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), - reconcileContext.NewCondition(v1.KeeperConditionTypeConfigurationInSync, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), - reconcileContext.NewCondition(v1.KeeperConditionTypeClusterSizeAligned, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), + recCtx.NewCondition(v1.KeeperConditionTypeReady, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), + recCtx.NewCondition(v1.KeeperConditionTypeDegraded, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), + recCtx.NewCondition(v1.KeeperConditionTypeReplicaStartupFailure, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), + recCtx.NewCondition(v1.KeeperConditionTypeConfigurationInSync, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), + recCtx.NewCondition(v1.KeeperConditionTypeClusterSizeAligned, metav1.ConditionUnknown, v1.KeeperConditionReasonStepFailed, errMsg), }) - result = ctrl.Result{RequeueAfter: RequeueOnErrorTimeout} - return result, r.upsertStatus(reconcileContext) + return ctrl.Result{RequeueAfter: RequeueOnErrorTimeout}, r.upsertStatus(recCtx) } if !stepResult.IsZero() { - reconcileContext.Log.Debug("reconcile step result", "result", result) - result.Requeue = true - if stepResult.RequeueAfter < result.RequeueAfter { - result.RequeueAfter = stepResult.RequeueAfter - } + stepCtx.Log.Debug("reconcile step result", "result", stepResult) + util.UpdateResult(&result, stepResult) } - reconcileContext.Log.Debug("reconcile step completed") + stepCtx.Log.Debug("reconcile step completed") } - reconcileContext.Log = log - setCondition(reconcileContext, v1.KeeperConditionTypeReconcileSucceeded, metav1.ConditionTrue, v1.KeeperConditionReasonReconcileFinished, "Reconcile succeeded") - log.Info("reconciliation loop end") + setCondition(recCtx, v1.KeeperConditionTypeReconcileSucceeded, metav1.ConditionTrue, v1.KeeperConditionReasonReconcileFinished, "Reconcile succeeded") + log.Info("reconciliation loop end", "result", result) - if err := r.upsertStatus(reconcileContext); err != nil { + if err := r.upsertStatus(recCtx); err != nil { return ctrl.Result{}, fmt.Errorf("update status after reconciliation: %w", err) } @@ -203,6 +207,8 @@ func (r *ClusterReconciler) reconcileHeadlessService(ctx reconcileContext) (*ctr func (r *ClusterReconciler) reconcileClusterRevisions(ctx reconcileContext) (*ctrl.Result, error) { var err error + ctx.KeeperCluster.Status.ObservedGeneration = ctx.KeeperCluster.Generation + ctx.KeeperCluster.Status.UpdateRevision, err = util.DeepHashObject(ctx.KeeperCluster.Spec) if err != nil { return &ctrl.Result{}, fmt.Errorf("get current spec revision: %w", err) @@ -239,27 +245,33 @@ func (r *ClusterReconciler) reconcileActiveReplicaStatus(ctx reconcileContext) ( ctx.KeeperCluster.Status.ReadyReplicas = 0 for id, state := range ctx.KeeperCluster.Status.Replicas { + var err error var replicaSts appsv1.StatefulSet - if err := r.Get(ctx.Context, types.NamespacedName{ + if err = r.Get(ctx.Context, types.NamespacedName{ Namespace: ctx.KeeperCluster.Namespace, Name: ctx.KeeperCluster.StatefulSetNameByReplicaID(id), }, &replicaSts); err != nil { if !k8serrors.IsNotFound(err) { return &ctrl.Result{}, fmt.Errorf("get replicas %q sts: %w", id, err) } + ctx.Log.Debug("StatefulSet not found", "replica_id", id, "statefuleset", ctx.KeeperCluster.StatefulSetNameByReplicaID(id)) - } - ctx.stsByReplicaID[id] = &replicaSts + state.Ready = false + state.Mode = "" + state.Error = false + state.Updated = false + } else { + ctx.stsByReplicaID[id] = &replicaSts - isError, err := r.getReplicaErrorState(ctx, id) - if err != nil { - return &ctrl.Result{}, fmt.Errorf("get replica %q error state: %w", id, err) + state.Error, state.Updated, err = r.getReplicaPodState(ctx, id, &replicaSts) + if err != nil { + return &ctrl.Result{}, fmt.Errorf("get replica %q error state: %w", id, err) + } + + state.Mode = keeperStates[id].Mode + state.Ready = slices.Contains(expectedModes, state.Mode) && replicaSts.Status.ReadyReplicas == replicaSts.Status.Replicas } - state.Mode = keeperStates[id].Mode - state.Ready = slices.Contains(expectedModes, state.Mode) - state.Error = isError - state.LastUpdate = metav1.Now() ctx.KeeperCluster.Status.Replicas[id] = state if state.Ready { @@ -285,9 +297,7 @@ func (r *ClusterReconciler) reconcileQuorumMembership(ctx reconcileContext) (*ct for i := range requestedReplicas { id := strconv.Itoa(i + 1) ctx.Log.Info("creating new replica", "replica_id", id) - ctx.KeeperCluster.Status.Replicas[id] = v1.KeeperReplica{ - LastUpdate: metav1.Now(), - } + ctx.KeeperCluster.Status.Replicas[id] = v1.KeeperReplica{} } return &ctrl.Result{}, nil @@ -348,9 +358,7 @@ func (r *ClusterReconciler) reconcileQuorumMembership(ctx reconcileContext) (*ct id := strconv.Itoa(i) if _, ok := ctx.KeeperCluster.Status.Replicas[id]; !ok { ctx.Log.Info("creating new replica", "replica_id", id) - ctx.KeeperCluster.Status.Replicas[id] = v1.KeeperReplica{ - LastUpdate: metav1.Now(), - } + ctx.KeeperCluster.Status.Replicas[id] = v1.KeeperReplica{} return nil, nil } } @@ -387,10 +395,25 @@ const ( statusStsDiff statusNotReadyUpToDate statusNotReadyWithDiff + statusUpdating statusError statusNotExists ) +var ( + mapStatusText = map[replicaUpdateState]string{ + statusUpToDate: "UpToDate", + statusStsAndConfigDiff: "StatefulSetAndConfigDiff", + statusConfigDiff: "ConfigDiff", + statusStsDiff: "StatefulSetDiff", + statusNotReadyUpToDate: "NotReadyUpToDate", + statusNotReadyWithDiff: "NotReadyWithDiff", + statusUpdating: "Updating", + statusError: "Error", + statusNotExists: "NotExists", + } +) + // reconcileReplicaResources performs update on replicas ConfigMap and StatefulSet. // If there are replicas that has no created StatefulSet, creates immediately. // If all replica exists performs rolling upgrade, with the following order preferences: @@ -412,33 +435,40 @@ func (r *ClusterReconciler) reconcileReplicaResources(ctx reconcileContext) (*ct } } + result := ctrl.Result{} + switch highestPriority { case statusUpToDate: ctx.Log.Info("all replicas are up to date") return nil, nil - case statusNotReadyUpToDate: - ctx.Log.Info("waiting for updated replicas to become ready", "replicas", replicasInStatus) - return &ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + case statusNotReadyUpToDate, statusUpdating: + ctx.Log.Info("waiting for updated replicas to become ready", "replicas", replicasInStatus, "priority", mapStatusText[highestPriority]) + result = ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout} case statusStsAndConfigDiff, statusConfigDiff, statusStsDiff: - // Leave one replica to rolling update. replicasInStatus must be not empty. - replicasInStatus = replicasInStatus[:1] - fallthrough - case statusNotReadyWithDiff, statusNotExists, statusError: - changes := false + // Leave one replica to rolling update. replicasInStatus must not be empty. + // Prefer replicas with higher id. + chosenReplica := replicasInStatus[0] for _, id := range replicasInStatus { - changed, err := r.updateReplica(ctx, id) - if err != nil { - return nil, fmt.Errorf("update replica %q: %w", id, err) + if id > chosenReplica { + chosenReplica = id } - changes = changes || changed } + ctx.Log.Info(fmt.Sprintf("updating chosen replica %s with priority %s: %v", chosenReplica, mapStatusText[highestPriority], replicasInStatus)) + replicasInStatus = []string{chosenReplica} + case statusNotReadyWithDiff, statusNotExists, statusError: + ctx.Log.Info(fmt.Sprintf("updating replicas with priority %s: %v", mapStatusText[highestPriority], replicasInStatus)) + } - if changes { - return &ctrl.Result{Requeue: true}, nil + for _, id := range replicasInStatus { + replicaResult, err := r.updateReplica(ctx, id) + if err != nil { + return nil, fmt.Errorf("update replica %q: %w", id, err) } + + util.UpdateResult(&result, replicaResult) } - return nil, nil + return &result, nil } func (r *ClusterReconciler) reconcileCleanUp(ctx reconcileContext) (*ctrl.Result, error) { @@ -509,7 +539,7 @@ func (r *ClusterReconciler) reconcileConditions(ctx reconcileContext) (*ctrl.Res replicasByMode[state.Mode] = append(replicasByMode[state.Mode], id) } - if ctx.HasConfigMapDiff(id) || ctx.HasStatefulSetDiff(id) { + if ctx.HasConfigMapDiff(id) || ctx.HasStatefulSetDiff(id) || !state.Updated { notUpdatedReplicas = append(notUpdatedReplicas, id) } } @@ -563,7 +593,7 @@ func (r *ClusterReconciler) reconcileConditions(ctx reconcileContext) (*ctrl.Res reason = v1.KeeperConditionReasonNoLeader message = "No replicas" case 1: - if len(replicasByMode["standalone"]) == 1 { + if len(replicasByMode[ModeStandalone]) == 1 { status = metav1.ConditionTrue reason = v1.KeeperConditionReasonStandaloneReady message = "Standalone Keeper is ready" @@ -576,24 +606,24 @@ func (r *ClusterReconciler) reconcileConditions(ctx reconcileContext) (*ctrl.Res requiredFollowersForQuorum := int(math.Ceil(float64(exists)/2)) - 1 switch { - case len(replicasByMode["standalone"]) > 0: + case len(replicasByMode[ModeStandalone]) > 0: status = metav1.ConditionFalse reason = v1.KeeperConditionReasonInconsistentState - slices.Sort(replicasByMode["standalone"]) - message = fmt.Sprintf("Standalone replica in cluster: %v", replicasByMode["standalone"]) - case len(replicasByMode["leader"]) > 1: + slices.Sort(replicasByMode[ModeStandalone]) + message = fmt.Sprintf("Standalone replica in cluster: %v", replicasByMode[ModeStandalone]) + case len(replicasByMode[ModeLeader]) > 1: status = metav1.ConditionFalse reason = v1.KeeperConditionReasonInconsistentState - slices.Sort(replicasByMode["leader"]) - message = fmt.Sprintf("Multiple leaders in cluster: %v", replicasByMode["leader"]) - case len(replicasByMode["leader"]) == 0: + slices.Sort(replicasByMode[ModeLeader]) + message = fmt.Sprintf("Multiple leaders in cluster: %v", replicasByMode[ModeLeader]) + case len(replicasByMode[ModeLeader]) == 0: status = metav1.ConditionFalse reason = v1.KeeperConditionReasonNoLeader message = "No leader in the cluster" - case len(replicasByMode["follower"]) < requiredFollowersForQuorum: + case len(replicasByMode[ModeFollower]) < requiredFollowersForQuorum: status = metav1.ConditionFalse reason = v1.KeeperConditionReasonNotEnoughFollowers - message = fmt.Sprintf("Not enough followers in cluster: %d, required: %d", len(replicasByMode["follower"]), requiredFollowersForQuorum) + message = fmt.Sprintf("Not enough followers in cluster: %d, required: %d", len(replicasByMode[ModeFollower]), requiredFollowersForQuorum) default: status = metav1.ConditionTrue reason = v1.KeeperConditionReasonClusterReady @@ -652,28 +682,28 @@ func (r *ClusterReconciler) updateConfigMap(ctx reconcileContext, configMap *cor return true, nil } -func (r *ClusterReconciler) updateReplica(ctx reconcileContext, replicaID string) (bool, error) { +func (r *ClusterReconciler) updateReplica(ctx reconcileContext, replicaID string) (*ctrl.Result, error) { log := ctx.Log.With("replica_id", replicaID) log.Info("updating replica") - configMap, err := TemplateConfigMap(ctx, replicaID) + configMap, err := TemplateConfigMap(ctx.KeeperCluster, replicaID) if err != nil { - return false, fmt.Errorf("template replica %q ConfigMap: %w", replicaID, err) + return nil, fmt.Errorf("template replica %q ConfigMap: %w", replicaID, err) } configChanged, err := r.updateConfigMap(ctx, configMap) if err != nil { - return false, fmt.Errorf("update replica %q ConfigMap: %w", replicaID, err) + return nil, fmt.Errorf("update replica %q ConfigMap: %w", replicaID, err) } statefulSet := TemplateStatefulSet(ctx.KeeperCluster, replicaID) if err := ctrl.SetControllerReference(ctx.KeeperCluster, statefulSet, r.Scheme); err != nil { - return false, fmt.Errorf("set replica %q StatefulSet controller reference: %w", replicaID, err) + return nil, fmt.Errorf("set replica %q StatefulSet controller reference: %w", replicaID, err) } foundStatefulSet, ok := ctx.stsByReplicaID[replicaID] if !ok { - log.Info("replica StatefulSet not found, creating", "statefulset", statefulSet.Name) + log.Info("replica StatefulSet not found, creating", "stateful_set", statefulSet.Name) util.AddObjectConfigHash(statefulSet, ctx.KeeperCluster.Status.ConfigurationRevision) util.AddHashWithKeyToAnnotations(statefulSet, util.AnnotationSpecHash, ctx.KeeperCluster.Status.StatefulSetRevision) @@ -682,10 +712,10 @@ func (r *ClusterReconciler) updateReplica(ctx reconcileContext, replicaID string }) if err != nil { - return false, fmt.Errorf("create replica %q StatefulSet %q: %w", replicaID, statefulSet.Name, err) + return nil, fmt.Errorf("create replica %q StatefulSet %q: %w", replicaID, statefulSet.Name, err) } - return true, nil + return &ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil } // Check if the StatefulSet is outdated and needs to be recreated @@ -693,10 +723,10 @@ func (r *ClusterReconciler) updateReplica(ctx reconcileContext, replicaID string if err != nil || BreakingStatefulSetVersion.GT(v) { log.Warn(fmt.Sprintf("Removing the StatefulSet because of a breaking change. Found version: %v, expected version: %v", v, BreakingStatefulSetVersion)) if err := r.Delete(ctx.Context, foundStatefulSet); err != nil { - return false, fmt.Errorf("delete StatefulSet: %w", err) + return nil, fmt.Errorf("delete StatefulSet: %w", err) } - return true, nil + return &ctrl.Result{Requeue: true}, nil } stsNeedsUpdate := ctx.HasStatefulSetDiff(replicaID) @@ -716,19 +746,25 @@ func (r *ClusterReconciler) updateReplica(ctx reconcileContext, replicaID string if !stsNeedsUpdate { log.Debug("StatefulSet is up to date") - return configChanged, nil + if configChanged { + return &ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + } + + return nil, nil } - log.Info("updating replica StatefulSet", "statefulset", statefulSet.Name) + log.Info("updating replica StatefulSet", "stateful_set", statefulSet.Name) foundStatefulSet.Spec = statefulSet.Spec + foundStatefulSet.Annotations = util.MergeMaps(foundStatefulSet.Annotations, statefulSet.Annotations) + foundStatefulSet.Labels = util.MergeMaps(foundStatefulSet.Labels, statefulSet.Labels) util.AddHashWithKeyToAnnotations(foundStatefulSet, util.AnnotationSpecHash, ctx.KeeperCluster.Status.StatefulSetRevision) if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { return r.Update(ctx.Context, foundStatefulSet) }); err != nil { - return false, fmt.Errorf("update replica %q StatefulSet %q: %w", replicaID, statefulSet.Name, err) + return nil, fmt.Errorf("update replica %q StatefulSet %q: %w", replicaID, statefulSet.Name, err) } - return true, nil + return &ctrl.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil } func (r *ClusterReconciler) getReplicaUpdateStatus(ctx reconcileContext, replicaID string) replicaUpdateState { @@ -744,6 +780,10 @@ func (r *ClusterReconciler) getReplicaUpdateStatus(ctx reconcileContext, replica configDiff := ctx.HasConfigMapDiff(replicaID) stsDiff := ctx.HasStatefulSetDiff(replicaID) + if !state.Updated { + return statusUpdating + } + if !state.Ready { if configDiff || stsDiff { return statusNotReadyWithDiff @@ -766,36 +806,45 @@ func (r *ClusterReconciler) getReplicaUpdateStatus(ctx reconcileContext, replica var podErrorStatuses = []string{"ImagePullBackOff", "ErrImagePull", "CrashLoopBackOff"} -func (r *ClusterReconciler) getReplicaErrorState(ctx reconcileContext, replicaID string) (bool, error) { - keeperPodList := &corev1.PodList{} - - appReq, err := labels.NewRequirement(util.LabelAppKey, selection.Equals, []string{ctx.KeeperCluster.SpecificName()}) - if err != nil { - return false, fmt.Errorf("make %q requirement to list pods: %w", util.LabelAppKey, err) +func (r *ClusterReconciler) getReplicaPodState(ctx reconcileContext, replicaID string, sts *appsv1.StatefulSet) (bool, bool, error) { + if sts == nil { + return false, false, nil } - replicaReq, err := labels.NewRequirement(util.LabelKeeperReplicaID, selection.Equals, []string{replicaID}) - if err != nil { - return false, fmt.Errorf("make %q requirement to list pods: %w", util.LabelKeeperReplicaID, err) + var keeperPod corev1.Pod + if err := r.Get(ctx.Context, types.NamespacedName{ + Namespace: sts.Namespace, + Name: fmt.Sprintf("%s-0", sts.Name), + }, &keeperPod); err != nil { + if !k8serrors.IsNotFound(err) { + return false, false, fmt.Errorf("get keeper %q pod: %w", replicaID, err) + } + + ctx.Log.Info("pod is not exists", "replica_id", replicaID, "stateful_set", sts.Name) + return false, false, nil } - err = r.List(ctx.Context, keeperPodList, &client.ListOptions{ - Namespace: ctx.KeeperCluster.Namespace, - LabelSelector: labels.NewSelector().Add(*appReq, *replicaReq), - }) - if err != nil { - return false, fmt.Errorf("list pods: %w", err) + + isUpdated := sts.Status.ObservedGeneration == sts.Generation && + sts.Status.UpdateRevision == keeperPod.Labels[appsv1.ControllerRevisionHashLabelKey] + if !isUpdated { + ctx.Log.Info("pod has pending changes", + "pod", keeperPod.Name, + "stateful_set", sts.Name, + "config_revision", keeperPod.Annotations[util.AnnotationConfigHash], + "stateful_set_revision", keeperPod.Annotations[util.AnnotationSpecHash], + ) } - for _, p := range keeperPodList.Items { - if len(p.Status.ContainerStatuses) > 0 { - waiting := p.Status.ContainerStatuses[0].State.Waiting - if waiting != nil && slices.Contains(podErrorStatuses, waiting.Reason) { - ctx.Log.Info("pod in error state", "pod", p.Name, "reason", waiting.Reason) - return true, nil - } + isError := false + for _, status := range keeperPod.Status.ContainerStatuses { + if status.State.Waiting != nil && slices.Contains(podErrorStatuses, status.State.Waiting.Reason) { + ctx.Log.Info("pod in error state", "pod", keeperPod.Name, "reason", status.State.Waiting.Reason) + isError = true + break } } - return false, nil + + return isError, isUpdated, nil } func (r *ClusterReconciler) upsertStatus(ctx reconcileContext) error { diff --git a/internal/controller/keeper/sync_test.go b/internal/controller/keeper/sync_test.go index 43e76261..ac8033e5 100644 --- a/internal/controller/keeper/sync_test.go +++ b/internal/controller/keeper/sync_test.go @@ -41,8 +41,10 @@ func TestUpdateReplica(t *testing.T) { require.Equal(t, ctx.KeeperCluster.Status.StatefulSetRevision, util.GetSpecHashFromObject(sts)) // Nothing to update + sts.Status.ObservedGeneration = sts.Generation ctx.stsByReplicaID[replicaID] = sts replicaState.Ready = true + replicaState.Updated = true ctx.KeeperCluster.Status.Replicas[replicaID] = replicaState result, err = r.reconcileReplicaResources(ctx) require.NoError(t, err) @@ -102,7 +104,7 @@ func setupReconciler(t *testing.T) (*ClusterReconciler, reconcileContext) { ConfigurationRevision: "config-v1", StatefulSetRevision: "sts-v1", Replicas: map[string]v1.KeeperReplica{ - "1": {LastUpdate: metav1.Now()}, + "1": {}, }, }, }, diff --git a/internal/controller/keeper/templates.go b/internal/controller/keeper/templates.go index c524e84c..41fedef0 100644 --- a/internal/controller/keeper/templates.go +++ b/internal/controller/keeper/templates.go @@ -181,16 +181,16 @@ func GetConfigurationRevision(cr *v1.KeeperCluster) (string, error) { func GetStatefulSetRevision(cr *v1.KeeperCluster) (string, error) { sts := TemplateStatefulSet(cr, "template") - hash, err := util.DeepHashObject(sts.Spec) + hash, err := util.DeepHashObject(sts) if err != nil { - return "", fmt.Errorf("hash template StatefulSet spec: %w", err) + return "", fmt.Errorf("hash template StatefulSet: %w", err) } return hash, nil } -func TemplateConfigMap(ctx reconcileContext, replicaID string) (*corev1.ConfigMap, error) { - config := generateConfigForSingleReplica(ctx.KeeperCluster, replicaID) +func TemplateConfigMap(cr *v1.KeeperCluster, replicaID string) (*corev1.ConfigMap, error) { + config := generateConfigForSingleReplica(cr, replicaID) configData, err := yaml.Marshal(config) if err != nil { return nil, fmt.Errorf("marshal config for replica %q: %w", replicaID, err) @@ -202,10 +202,10 @@ func TemplateConfigMap(ctx reconcileContext, replicaID string) (*corev1.ConfigMa APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: ctx.KeeperCluster.ConfigMapNameByReplicaID(replicaID), - Namespace: ctx.KeeperCluster.Namespace, - Labels: util.MergeMaps(ctx.KeeperCluster.Spec.Labels, map[string]string{ - util.LabelAppKey: ctx.KeeperCluster.SpecificName(), + Name: cr.ConfigMapNameByReplicaID(replicaID), + Namespace: cr.Namespace, + Labels: util.MergeMaps(cr.Spec.Labels, map[string]string{ + util.LabelAppKey: cr.SpecificName(), util.LabelKeeperReplicaID: replicaID, }), }, diff --git a/internal/util/common.go b/internal/util/common.go index 8788108b..a4efd022 100644 --- a/internal/util/common.go +++ b/internal/util/common.go @@ -10,7 +10,7 @@ import ( "strings" "github.com/davecgh/go-spew/spew" - appsv1 "k8s.io/api/apps/v1" + ctrl "sigs.k8s.io/controller-runtime" ) // DeepHashObject writes specified object to hash using the spew library @@ -42,29 +42,6 @@ func MergeMaps(mapsToMerge ...map[string]string) map[string]string { return result } -func ReverseMap[M ~map[K]V, K comparable, V comparable](m M) map[V]K { - r := make(map[V]K, len(m)) - for k, v := range m { - r[v] = k - } - return r -} - -func IsStatefulSetInSync(found *appsv1.StatefulSet, expectedReplicas *int32) bool { - if found.Status.ObservedGeneration == 0 || found.Generation != found.Status.ObservedGeneration { - return false - } - if found.Spec.Replicas == nil || expectedReplicas == nil { - return false - } - return *expectedReplicas == *found.Spec.Replicas && - *expectedReplicas == found.Status.Replicas && - *expectedReplicas == found.Status.ReadyReplicas && - *expectedReplicas == found.Status.CurrentReplicas && - *expectedReplicas == found.Status.UpdatedReplicas && - *expectedReplicas == found.Status.AvailableReplicas -} - func GetFunctionName(temp interface{}) string { strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(temp).Pointer()).Name(), ".") return strings.TrimSuffix(strs[len(strs)-1], "-fm") @@ -114,3 +91,20 @@ func applyDefaultRecursive(sourceValue reflect.Value, defaults reflect.Value) er return nil } + +func UpdateResult(result *ctrl.Result, update *ctrl.Result) { + if update.IsZero() { + return + } + + if result.IsZero() { + result.Requeue = true + result.RequeueAfter = update.RequeueAfter + return + } + + result.Requeue = true + if update.RequeueAfter < result.RequeueAfter { + result.RequeueAfter = update.RequeueAfter + } +} diff --git a/internal/util/common_test.go b/internal/util/common_test.go index 1707380e..8e25f9cf 100644 --- a/internal/util/common_test.go +++ b/internal/util/common_test.go @@ -2,8 +2,10 @@ package util import ( "testing" + "time" "github.com/stretchr/testify/require" + ctrl "sigs.k8s.io/controller-runtime" ) type InternalStruct struct { @@ -67,3 +69,51 @@ func TestApplyDefault(t *testing.T) { require.Equal(t, source.Map[7], "custom") }) } + +func TestUpdateResult(t *testing.T) { + t.Run("update empty", func(t *testing.T) { + result := ctrl.Result{} + update := ctrl.Result{RequeueAfter: time.Second} + UpdateResult(&result, &update) + require.True(t, result.Requeue) + require.Equal(t, update.RequeueAfter, result.RequeueAfter) + }) + + t.Run("do not update with empty", func(t *testing.T) { + result := ctrl.Result{RequeueAfter: time.Second} + UpdateResult(&result, nil) + require.Equal(t, ctrl.Result{RequeueAfter: time.Second}, result) + }) + + t.Run("update to closer", func(t *testing.T) { + result := ctrl.Result{RequeueAfter: time.Minute} + update := ctrl.Result{RequeueAfter: time.Second} + UpdateResult(&result, &update) + require.True(t, result.Requeue) + require.Equal(t, time.Second, result.RequeueAfter) + }) + + t.Run("already better", func(t *testing.T) { + result := ctrl.Result{RequeueAfter: time.Second} + update := ctrl.Result{RequeueAfter: time.Minute} + UpdateResult(&result, &update) + require.True(t, result.Requeue) + require.Equal(t, time.Second, result.RequeueAfter) + }) + + t.Run("update to minimal", func(t *testing.T) { + result := ctrl.Result{RequeueAfter: time.Second} + update := ctrl.Result{Requeue: true} + UpdateResult(&result, &update) + require.True(t, result.Requeue) + require.Equal(t, time.Duration(0), result.RequeueAfter) + }) + + t.Run("already minimal", func(t *testing.T) { + result := ctrl.Result{Requeue: true} + update := ctrl.Result{RequeueAfter: time.Second} + UpdateResult(&result, &update) + require.True(t, result.Requeue) + require.Equal(t, time.Duration(0), result.RequeueAfter) + }) +}