diff --git a/controllers/apps/component_controller.go b/controllers/apps/component_controller.go index cae17b2e7aa..672c433bf68 100644 --- a/controllers/apps/component_controller.go +++ b/controllers/apps/component_controller.go @@ -185,6 +185,10 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( &componentPostProvisionTransformer{}, // update component status &componentStatusTransformer{Client: r.Client}, + // repair PostgreSQL Patroni dynamic config after failover + &componentPatroniDCSRepairTransformer{}, + // repair PostgreSQL standby password drift after replication breaks + &componentPostgreSQLStandbyPasswordRepairTransformer{}, ).Build() // Execute stage @@ -225,6 +229,7 @@ func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error { Owns(&dpv1alpha1.Restore{}). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). Watches(&corev1.PersistentVolumeClaim{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). + Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). Owns(&batchv1.Job{}). Watches(&appsv1alpha1.Configuration{}, handler.EnqueueRequestsFromMapFunc(r.configurationEventHandler)) @@ -257,6 +262,7 @@ func (r *ComponentReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, mul Watch(b, &corev1.Secret{}, eventHandler). Watch(b, &corev1.ConfigMap{}, eventHandler). Watch(b, &corev1.PersistentVolumeClaim{}, eventHandler). + Watch(b, &corev1.Pod{}, eventHandler). Watch(b, &batchv1.Job{}, eventHandler). Watch(b, &corev1.ServiceAccount{}, eventHandler). Watch(b, &rbacv1.RoleBinding{}, eventHandler). diff --git a/controllers/apps/transformer_component_patroni_dcs_repair.go b/controllers/apps/transformer_component_patroni_dcs_repair.go new file mode 100644 index 00000000000..3b27a312be6 --- /dev/null +++ b/controllers/apps/transformer_component_patroni_dcs_repair.go @@ -0,0 +1,493 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" +) + +const ( + patroniDCSRepairConditionType = "PatroniDCSRepair" + + patroniDCSRepairReasonSucceeded = "Succeeded" + patroniDCSRepairReasonFailed = "Failed" + + patroniDCSRepairRequeueInterval = time.Minute + patroniHTTPTimeout = 10 * time.Second + patroniMaxResponseBytes = 1 << 20 + patroniDefaultRESTPort = int32(8008) + + pgHBAConfigFile = "pg_hba.conf" +) + +var ( + requiredPatroniPgHBARules = []string{ + "host all all 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + "host replication all 127.0.0.1/32 md5", + } + fallbackPatroniPgHBARules = append([]string{"local all all trust"}, requiredPatroniPgHBARules...) +) + +// componentPatroniDCSRepairTransformer repairs PostgreSQL Patroni dynamic +// config that may miss pg_hba rules after failover. +type componentPatroniDCSRepairTransformer struct { + patroniClient patroniConfigClient +} + +var _ graph.Transformer = &componentPatroniDCSRepairTransformer{} + +func (t *componentPatroniDCSRepairTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error { + transCtx, _ := ctx.(*componentTransformContext) + if model.IsObjectDeleting(transCtx.ComponentOrig) { + return nil + } + if !isPostgreSQLComponent(transCtx) { + return nil + } + if transCtx.Component.Status.Phase != appsv1alpha1.RunningClusterCompPhase { + return nil + } + + runningITS, ok := transCtx.RunningWorkload.(*workloads.InstanceSet) + if !ok || runningITS == nil || runningITS.Status.AvailableReplicas == 0 { + return nil + } + + leaderPod, err := t.getLeaderPod(transCtx, runningITS) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + if leaderPod == nil || leaderPod.Status.PodIP == "" { + err := fmt.Errorf("postgresql patroni dcs repair: leader pod %q has no pod ip", leaderMemberPodName(runningITS)) + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + patroniURL, err := patroniRESTURL(leaderPod.Status.PodIP, patroniRESTPort(leaderPod)) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + expectedPgHBA, err := t.expectedPgHBARules(transCtx) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + patroniClient := t.patroniClient + if patroniClient == nil { + patroniClient = newHTTPPatroniConfigClient() + } + repaired, err := repairPatroniPgHBA( + transCtx.Context, + patroniClient, + patroniURL, + expectedPgHBA, + previousPatroniDCSRepairFailed(transCtx.Component.Status.Conditions), + ) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + t.markRepairSucceeded(transCtx, repaired) + if repaired && transCtx.EventRecorder != nil { + transCtx.EventRecorder.Event(transCtx.Component, corev1.EventTypeNormal, + patroniDCSRepairConditionType, "repaired PostgreSQL Patroni DCS pg_hba rules") + } + return nil +} + +func isPostgreSQLComponent(transCtx *componentTransformContext) bool { + if transCtx == nil || transCtx.CompDef == nil { + return false + } + serviceKind := strings.ToLower(strings.TrimSpace(transCtx.CompDef.Spec.ServiceKind)) + for _, alias := range constant.GetPostgreSQLAlias() { + if serviceKind == alias { + return true + } + } + compDefName := strings.ToLower(transCtx.CompDef.Name) + return compDefName == constant.ServiceKindPostgreSQL || + strings.HasPrefix(compDefName, constant.ServiceKindPostgreSQL+"-") +} + +func (t *componentPatroniDCSRepairTransformer) getLeaderPod( + transCtx *componentTransformContext, + runningITS *workloads.InstanceSet, +) (*corev1.Pod, error) { + podName := leaderMemberPodName(runningITS) + if podName == "" { + return nil, fmt.Errorf("postgresql patroni dcs repair: leader pod not found") + } + + pod := &corev1.Pod{} + podKey := types.NamespacedName{Namespace: transCtx.Component.Namespace, Name: podName} + if err := transCtx.Client.Get(transCtx.Context, podKey, pod, inDataContext4C()); err != nil { + return nil, fmt.Errorf("postgresql patroni dcs repair: get leader pod %q: %w", podName, err) + } + return pod, nil +} + +func leaderMemberPodName(runningITS *workloads.InstanceSet) string { + if runningITS == nil { + return "" + } + for _, member := range runningITS.Status.MembersStatus { + if member.ReplicaRole != nil && member.ReplicaRole.IsLeader { + return member.PodName + } + } + return "" +} + +func (t *componentPatroniDCSRepairTransformer) expectedPgHBARules( + transCtx *componentTransformContext, +) ([]string, error) { + for _, configSpec := range transCtx.SynthesizeComponent.ConfigTemplates { + cmKey := types.NamespacedName{ + Namespace: transCtx.Cluster.Namespace, + Name: cfgcore.GetComponentCfgName( + transCtx.Cluster.Name, + transCtx.SynthesizeComponent.Name, + configSpec.Name, + ), + } + cmObj := &corev1.ConfigMap{} + if err := transCtx.Client.Get(transCtx.Context, cmKey, cmObj, inDataContext4C()); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return nil, fmt.Errorf("postgresql patroni dcs repair: get configmap %q: %w", cmKey.Name, err) + } + if content, ok := cmObj.Data[pgHBAConfigFile]; ok { + rules := parsePgHBAContent(content) + if len(rules) > 0 { + return ensurePgHBARemoteRules(rules), nil + } + } + } + return append([]string{}, fallbackPatroniPgHBARules...), nil +} + +func (t *componentPatroniDCSRepairTransformer) markRepairSucceeded( + transCtx *componentTransformContext, + repaired bool, +) { + message := "PostgreSQL Patroni DCS pg_hba rules are up to date" + if repaired { + message = "PostgreSQL Patroni DCS pg_hba rules were repaired" + } + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: patroniDCSRepairConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: patroniDCSRepairReasonSucceeded, + Message: message, + }) +} + +func (t *componentPatroniDCSRepairTransformer) markRepairFailed( + transCtx *componentTransformContext, + err error, +) { + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: patroniDCSRepairConditionType, + Status: metav1.ConditionFalse, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: patroniDCSRepairReasonFailed, + Message: err.Error(), + }) +} + +func previousPatroniDCSRepairFailed(conditions []metav1.Condition) bool { + condition := meta.FindStatusCondition(conditions, patroniDCSRepairConditionType) + return condition != nil && condition.Status == metav1.ConditionFalse +} + +func parsePgHBAContent(content string) []string { + rules := make([]string, 0) + for _, rawLine := range strings.Split(content, "\n") { + line := strings.TrimSpace(rawLine) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if commentIndex := strings.Index(line, "#"); commentIndex >= 0 { + line = strings.TrimSpace(line[:commentIndex]) + } + if line == "" { + continue + } + rules = append(rules, strings.Join(strings.Fields(line), " ")) + } + return rules +} + +func ensurePgHBARemoteRules(rules []string) []string { + rules, _ = mergePgHBARules(rules, requiredPatroniPgHBARules) + return rules +} + +func repairPatroniPgHBA( + ctx context.Context, + patroniClient patroniConfigClient, + baseURL string, + expectedRules []string, + reloadIfUnchanged bool, +) (bool, error) { + config, err := patroniClient.GetConfig(ctx, baseURL) + if err != nil { + return false, err + } + patchedRules, changed := mergePgHBARules(config.PostgreSQL.PgHBA, expectedRules) + if !changed { + if reloadIfUnchanged { + if err := patroniClient.Reload(ctx, baseURL); err != nil { + return false, err + } + } + return false, nil + } + + patch := patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: patchedRules, + }, + } + if err := patroniClient.PatchConfig(ctx, baseURL, patch); err != nil { + return false, err + } + if err := patroniClient.Reload(ctx, baseURL); err != nil { + return false, err + } + + config, err = patroniClient.GetConfig(ctx, baseURL) + if err != nil { + return false, err + } + if missing := missingPgHBARules(config.PostgreSQL.PgHBA, expectedRules); len(missing) > 0 { + return false, fmt.Errorf("postgresql patroni dcs repair: pg_hba rules still missing: %s", strings.Join(missing, "; ")) + } + return true, nil +} + +func mergePgHBARules(currentRules, expectedRules []string) ([]string, bool) { + merged := normalizePgHBARules(currentRules) + existing := make(map[string]struct{}, len(merged)) + for _, rule := range merged { + existing[rule] = struct{}{} + } + + changed := false + for _, rule := range normalizePgHBARules(expectedRules) { + if _, ok := existing[rule]; ok { + continue + } + merged = append(merged, rule) + existing[rule] = struct{}{} + changed = true + } + return merged, changed +} + +func missingPgHBARules(currentRules, expectedRules []string) []string { + current := normalizePgHBARules(currentRules) + currentSet := make(map[string]struct{}, len(current)) + for _, rule := range current { + currentSet[rule] = struct{}{} + } + + missing := make([]string, 0) + for _, rule := range normalizePgHBARules(expectedRules) { + if _, ok := currentSet[rule]; !ok { + missing = append(missing, rule) + } + } + return missing +} + +func normalizePgHBARules(rules []string) []string { + normalized := make([]string, 0, len(rules)) + seen := map[string]struct{}{} + for _, rule := range rules { + rule = strings.Join(strings.Fields(strings.TrimSpace(rule)), " ") + if rule == "" { + continue + } + if _, ok := seen[rule]; ok { + continue + } + seen[rule] = struct{}{} + normalized = append(normalized, rule) + } + return normalized +} + +func patroniRESTURL(podIP string, port int32) (string, error) { + if net.ParseIP(podIP) == nil { + return "", fmt.Errorf("postgresql patroni dcs repair: invalid patroni pod ip %q", podIP) + } + return "http://" + net.JoinHostPort(podIP, strconv.Itoa(int(port))), nil +} + +func patroniRESTPort(pod *corev1.Pod) int32 { + if pod == nil { + return patroniDefaultRESTPort + } + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + name := strings.ToLower(port.Name) + if name == "patroni" || name == "patroni-rest" || + name == "patroni-restapi" || name == "patroni-api" || name == "restapi" { + return port.ContainerPort + } + } + } + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.ContainerPort == patroniDefaultRESTPort { + return port.ContainerPort + } + } + } + return patroniDefaultRESTPort +} + +type patroniConfigClient interface { + GetConfig(ctx context.Context, baseURL string) (*patroniDynamicConfig, error) + PatchConfig(ctx context.Context, baseURL string, config patroniDynamicConfig) error + Reload(ctx context.Context, baseURL string) error +} + +type patroniDynamicConfig struct { + PostgreSQL patroniPostgreSQLConfig `json:"postgresql,omitempty"` +} + +type patroniPostgreSQLConfig struct { + PgHBA []string `json:"pg_hba,omitempty"` +} + +type httpPatroniConfigClient struct { + client *http.Client +} + +func newHTTPPatroniConfigClient() *httpPatroniConfigClient { + return &httpPatroniConfigClient{ + client: &http.Client{Timeout: patroniHTTPTimeout}, + } +} + +func (c *httpPatroniConfigClient) GetConfig(ctx context.Context, baseURL string) (*patroniDynamicConfig, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, patroniURL(baseURL, "config"), nil) + if err != nil { + return nil, fmt.Errorf("postgresql patroni dcs repair: build get config request: %w", err) + } + + var config patroniDynamicConfig + if err := c.doJSON(req, &config); err != nil { + return nil, err + } + return &config, nil +} + +func (c *httpPatroniConfigClient) PatchConfig( + ctx context.Context, + baseURL string, + config patroniDynamicConfig, +) error { + body, err := json.Marshal(config) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: marshal patch config request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, patroniURL(baseURL, "config"), bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: build patch config request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + return c.do(req, nil) +} + +func (c *httpPatroniConfigClient) Reload(ctx context.Context, baseURL string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, patroniURL(baseURL, "reload"), nil) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: build reload request: %w", err) + } + return c.do(req, nil) +} + +func (c *httpPatroniConfigClient) doJSON(req *http.Request, out any) error { + req.Header.Set("Accept", "application/json") + return c.do(req, out) +} + +func (c *httpPatroniConfigClient) do(req *http.Request, out any) error { + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: send %s %s: %w", req.Method, req.URL.Path, err) + } + defer resp.Body.Close() + + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + return fmt.Errorf("postgresql patroni dcs repair: %s %s returned %s", req.Method, req.URL.Path, resp.Status) + } + if out == nil { + _, err = io.Copy(io.Discard, io.LimitReader(resp.Body, patroniMaxResponseBytes)) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: read %s %s response: %w", req.Method, req.URL.Path, err) + } + return nil + } + if err := json.NewDecoder(io.LimitReader(resp.Body, patroniMaxResponseBytes)).Decode(out); err != nil { + return fmt.Errorf("postgresql patroni dcs repair: decode %s %s response: %w", req.Method, req.URL.Path, err) + } + return nil +} + +func patroniURL(baseURL, path string) string { + return strings.TrimRight(baseURL, "/") + "/" + strings.TrimLeft(path, "/") +} diff --git a/controllers/apps/transformer_component_patroni_dcs_repair_test.go b/controllers/apps/transformer_component_patroni_dcs_repair_test.go new file mode 100644 index 00000000000..84acfe6760b --- /dev/null +++ b/controllers/apps/transformer_component_patroni_dcs_repair_test.go @@ -0,0 +1,410 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +func TestParsePgHBAContent(t *testing.T) { + t.Parallel() + + content := ` +# comment +local all all trust +host all all 127.0.0.1/32 md5 # inline comment + +host replication standby 0.0.0.0/0 md5 +` + require.Equal(t, []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication standby 0.0.0.0/0 md5", + }, parsePgHBAContent(content)) +} + +func TestMergePgHBARules(t *testing.T) { + t.Parallel() + + current := []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + } + expected := []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + } + + merged, changed := mergePgHBARules(current, expected) + require.True(t, changed) + require.Equal(t, []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication standby 0.0.0.0/0 md5", + }, merged) + require.Empty(t, missingPgHBARules(merged, expected)) +} + +func TestRepairPatroniPgHBA(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, false) + + require.NoError(t, err) + require.True(t, repaired) + require.True(t, client.patched) + require.True(t, client.reloaded) + require.Equal(t, []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, client.config.PostgreSQL.PgHBA) +} + +func TestRepairPatroniPgHBANoop(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "host replication standby 0.0.0.0/0 md5", + }, false) + + require.NoError(t, err) + require.False(t, repaired) + require.False(t, client.patched) + require.False(t, client.reloaded) +} + +func TestRepairPatroniPgHBAReloadsAfterPreviousFailure(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"host replication standby 0.0.0.0/0 md5"}, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "host replication standby 0.0.0.0/0 md5", + }, true) + + require.NoError(t, err) + require.False(t, repaired) + require.False(t, client.patched) + require.True(t, client.reloaded) +} + +func TestEnsurePgHBARemoteRules(t *testing.T) { + t.Parallel() + + rules := ensurePgHBARemoteRules([]string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication all 127.0.0.1/32 md5", + }) + + require.Contains(t, rules, "host all all 0.0.0.0/0 md5") + require.Contains(t, rules, "host replication standby 0.0.0.0/0 md5") + require.Contains(t, rules, "host replication all 127.0.0.1/32 md5") +} + +func TestPatroniRESTURL(t *testing.T) { + t.Parallel() + + url, err := patroniRESTURL("10.0.0.10", 8008) + require.NoError(t, err) + require.Equal(t, "http://10.0.0.10:8008", url) + + url, err = patroniRESTURL("2001:db8::10", 8008) + require.NoError(t, err) + require.Equal(t, "http://[2001:db8::10]:8008", url) + + _, err = patroniRESTURL("not-an-ip", 8008) + require.Error(t, err) +} + +func TestHTTPPatroniConfigClient(t *testing.T) { + t.Parallel() + + requests := make([]string, 0) + config := patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests = append(requests, r.Method+" "+r.URL.Path) + switch { + case r.Method == http.MethodGet && r.URL.Path == "/config": + require.NoError(t, json.NewEncoder(w).Encode(config)) + case r.Method == http.MethodPatch && r.URL.Path == "/config": + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + require.NoError(t, json.NewDecoder(r.Body).Decode(&config)) + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodPost && r.URL.Path == "/reload": + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + client := &httpPatroniConfigClient{client: server.Client()} + got, err := client.GetConfig(context.Background(), server.URL) + require.NoError(t, err) + require.Equal(t, []string{"local all all trust"}, got.PostgreSQL.PgHBA) + + err = client.PatchConfig(context.Background(), server.URL, patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"host all all 0.0.0.0/0 md5"}, + }, + }) + require.NoError(t, err) + require.NoError(t, client.Reload(context.Background(), server.URL)) + require.Equal(t, []string{ + "GET /config", + "PATCH /config", + "POST /reload", + }, requests) + require.Equal(t, []string{"host all all 0.0.0.0/0 md5"}, config.PostgreSQL.PgHBA) +} + +func TestComponentPatroniDCSRepairTransformer(t *testing.T) { + t.Parallel() + + const ( + namespace = "default" + clusterName = "test" + componentName = "postgresql" + compName = "test-postgresql" + configName = "postgresql-configuration" + leaderPodName = "test-postgresql-1" + ) + + scheme := rscheme + cluster := &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + } + comp := &appsv1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{ + Name: compName, + Namespace: namespace, + }, + Status: appsv1alpha1.ComponentStatus{ + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + compDef := &appsv1alpha1.ComponentDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "postgresql"}, + Spec: appsv1alpha1.ComponentDefinitionSpec{ + ServiceKind: "PostgreSQL", + }, + } + leaderPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaderPodName, + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "postgresql", + Ports: []corev1.ContainerPort{{ + Name: "patroni", + ContainerPort: 8010, + }}, + }}, + }, + Status: corev1.PodStatus{PodIP: "10.0.0.10"}, + } + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cfgcore.GetComponentCfgName(clusterName, componentName, configName), + Namespace: namespace, + }, + Data: map[string]string{ + pgHBAConfigFile: strings.Join([]string{ + "local all all trust", + "host all all 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + }, "\n"), + }, + } + runningITS := &workloads.InstanceSet{ + Status: workloads.InstanceSetStatus{ + AvailableReplicas: 2, + MembersStatus: []workloads.MemberStatus{{ + PodName: leaderPodName, + ReplicaRole: &workloads.ReplicaRole{ + Name: "leader", + IsLeader: true, + }, + }}, + }, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(cluster, comp, compDef, leaderPod, configMap). + Build() + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(k8sClient), + Logger: log.Log, + Cluster: cluster, + CompDef: compDef, + Component: comp.DeepCopy(), + ComponentOrig: comp.DeepCopy(), + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: namespace, + ClusterName: clusterName, + Name: componentName, + ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ + ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{ + Name: configName, + }, + }}, + }, + RunningWorkload: runningITS, + } + patroniClient := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + }, + } + + transformer := &componentPatroniDCSRepairTransformer{ + patroniClient: patroniClient, + } + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.True(t, patroniClient.patched) + require.True(t, patroniClient.reloaded) + require.Equal(t, "http://10.0.0.10:8010", patroniClient.baseURL) + require.Empty(t, missingPgHBARules( + patroniClient.config.PostgreSQL.PgHBA, + parsePgHBAContent(configMap.Data[pgHBAConfigFile]), + )) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, patroniDCSRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) +} + +func TestComponentPatroniDCSRepairTransformerFallbackPgHBA(t *testing.T) { + t.Parallel() + + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(fake.NewClientBuilder().WithScheme(rscheme).Build()), + Cluster: &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: "default", + ClusterName: "test", + Name: "postgresql", + ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ + ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{ + Name: "postgresql-configuration", + }, + }}, + }, + } + + rules, err := (&componentPatroniDCSRepairTransformer{}).expectedPgHBARules(transCtx) + + require.NoError(t, err) + require.Equal(t, fallbackPatroniPgHBARules, rules) +} + +type fakePatroniConfigClient struct { + config patroniDynamicConfig + baseURL string + patched bool + reloaded bool +} + +func (c *fakePatroniConfigClient) GetConfig(_ context.Context, baseURL string) (*patroniDynamicConfig, error) { + c.baseURL = baseURL + config := c.config + return &config, nil +} + +func (c *fakePatroniConfigClient) PatchConfig( + _ context.Context, + baseURL string, + config patroniDynamicConfig, +) error { + c.baseURL = baseURL + c.patched = true + c.config = config + return nil +} + +func (c *fakePatroniConfigClient) Reload(_ context.Context, baseURL string) error { + c.baseURL = baseURL + c.reloaded = true + return nil +} + +var _ patroniConfigClient = &fakePatroniConfigClient{} diff --git a/controllers/apps/transformer_component_postgresql_standby_password_repair.go b/controllers/apps/transformer_component_postgresql_standby_password_repair.go new file mode 100644 index 00000000000..995c7d624cd --- /dev/null +++ b/controllers/apps/transformer_component_postgresql_standby_password_repair.go @@ -0,0 +1,573 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + ctrl "sigs.k8s.io/controller-runtime" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" +) + +const ( + standbyPasswordRepairConditionType = "StandbyPasswordRepair" + + standbyPasswordRepairReasonSucceeded = "Succeeded" + standbyPasswordRepairReasonFailed = "Failed" + standbyPasswordRepairReasonInconsistent = "StandbyPasswordInconsistent" + standbyPasswordRepairReasonSkipped = "Skipped" + + standbyPasswordRepairRequeueInterval = time.Minute + standbyPgpassPath = "/run/postgresql/pgpass" + standbyUserName = "standby" + readStandbyPasswordEnvCommand = `printf "%s" "${PGPASSWORD_STANDBY:-}"` + readPostgreSQLModeEnvCommand = `printf "%s" "${PG_MODE:-}"` +) + +const ensureStandbyPasswordScript = ` +set -eu +password="$(cat)" +if [ -z "$password" ]; then + echo "empty standby password" >&2 + exit 1 +fi +escaped_password="$(printf "%s" "$password" | sed "s/'/''/g")" +matches="$(psql -U postgres -v ON_ERROR_STOP=1 -Atq </dev/null +SET standard_conforming_strings = on; +SET password_encryption = 'md5'; +ALTER USER standby PASSWORD '$escaped_password'; +SQL +fi +verified="$(psql -U postgres -v ON_ERROR_STOP=1 -Atq <&2 + exit 1 +fi +printf "%s\n" "$matches" +` + +var errStandbyEntryNotFound = errors.New("standby entry not found") + +// componentPostgreSQLStandbyPasswordRepairTransformer repairs drift between +// the standby password used by pods and the password stored in PostgreSQL. +type componentPostgreSQLStandbyPasswordRepairTransformer struct { + restConfig *rest.Config + execRunner podExecRunner +} + +var _ graph.Transformer = &componentPostgreSQLStandbyPasswordRepairTransformer{} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) Transform( + ctx graph.TransformContext, + dag *graph.DAG, +) error { + transCtx, _ := ctx.(*componentTransformContext) + if model.IsObjectDeleting(transCtx.ComponentOrig) { + return nil + } + if !isPostgreSQLComponent(transCtx) { + return nil + } + if transCtx.Component.Status.Phase != appsv1alpha1.RunningClusterCompPhase { + return nil + } + + runningITS, ok := transCtx.RunningWorkload.(*workloads.InstanceSet) + if !ok || runningITS == nil || runningITS.Status.AvailableReplicas == 0 { + return nil + } + + pods, err := t.runningPods(transCtx) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if len(pods) == 0 { + err := fmt.Errorf("postgresql standby password repair: no running pods with pod ip found") + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + runner := t.execRunner + if runner == nil { + runner = newKubePodExecRunner(t.restConfig) + } + skip, err := shouldSkipStandbyPasswordRepair(transCtx.Context, runner, pods) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if skip { + t.markRepairSkipped(transCtx) + return nil + } + + leaderPod, err := t.leaderPod(transCtx, runningITS) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if leaderPod == nil || leaderPod.Status.PodIP == "" { + err := fmt.Errorf( + "postgresql standby password repair: leader pod %q has no pod ip", + leaderMemberPodName(runningITS), + ) + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + expectedPassword, err := consistentStandbyPassword(transCtx.Context, runner, pods) + if err != nil { + if isInconsistentStandbyPasswordError(err) { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonInconsistent, err) + return nil + } + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + repaired, err := ensureLeaderStandbyPassword(transCtx.Context, runner, leaderPod, expectedPassword) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + t.markRepairSucceeded(transCtx, repaired) + if repaired && transCtx.EventRecorder != nil { + transCtx.EventRecorder.Event(transCtx.Component, corev1.EventTypeNormal, + standbyPasswordRepairConditionType, "repaired PostgreSQL standby password drift") + } + return nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) runningPods( + transCtx *componentTransformContext, +) ([]*corev1.Pod, error) { + labels := constant.GetComponentWellKnownLabels(transCtx.Cluster.Name, transCtx.SynthesizeComponent.Name) + pods, err := ctrlcomp.ListPodOwnedByComponent( + transCtx.Context, + transCtx.Client, + transCtx.SynthesizeComponent.Namespace, + labels, + inDataContext4C(), + ) + if err != nil { + return nil, fmt.Errorf("postgresql standby password repair: list component pods: %w", err) + } + + runningPods := make([]*corev1.Pod, 0, len(pods)) + for _, pod := range pods { + if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning || pod.Status.PodIP == "" { + continue + } + runningPods = append(runningPods, pod) + } + return runningPods, nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) leaderPod( + transCtx *componentTransformContext, + runningITS *workloads.InstanceSet, +) (*corev1.Pod, error) { + podName := leaderMemberPodName(runningITS) + if podName == "" { + return nil, fmt.Errorf("postgresql standby password repair: leader pod not found") + } + + pod := &corev1.Pod{} + podKey := types.NamespacedName{Namespace: transCtx.Component.Namespace, Name: podName} + if err := transCtx.Client.Get(transCtx.Context, podKey, pod, inDataContext4C()); err != nil { + return nil, fmt.Errorf("postgresql standby password repair: get leader pod %q: %w", podName, err) + } + return pod, nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) markRepairSucceeded( + transCtx *componentTransformContext, + repaired bool, +) { + message := "PostgreSQL standby password is up to date" + if repaired { + message = "PostgreSQL standby password was repaired" + } + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: standbyPasswordRepairConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: standbyPasswordRepairReasonSucceeded, + Message: message, + }) +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) markRepairSkipped( + transCtx *componentTransformContext, +) { + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: standbyPasswordRepairConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: standbyPasswordRepairReasonSkipped, + Message: "PostgreSQL standby password repair is skipped for standby cluster mode", + }) +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) markRepairFailed( + transCtx *componentTransformContext, + reason string, + err error, +) { + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: standbyPasswordRepairConditionType, + Status: metav1.ConditionFalse, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: err.Error(), + }) +} + +func shouldSkipStandbyPasswordRepair(ctx context.Context, runner podExecRunner, pods []*corev1.Pod) (bool, error) { + for _, pod := range pods { + mode, err := postgreSQLModeFromPod(ctx, runner, pod) + if err != nil { + return false, err + } + // In standby-cluster mode the addon may point standby credentials at a + // remote primary, so they are not safe for local role repair. + if strings.Contains(strings.ToLower(mode), "standby") { + return true, nil + } + } + return false, nil +} + +func postgreSQLModeFromPod(ctx context.Context, runner podExecRunner, pod *corev1.Pod) (string, error) { + stdout, stderr, err := runner.Exec(ctx, pod, []string{"sh", "-c", readPostgreSQLModeEnvCommand}, "") + if err != nil { + return "", fmt.Errorf( + "postgresql standby password repair: read pg mode from pod %q: %w: %s", + pod.Name, + err, + strings.TrimSpace(stderr), + ) + } + return strings.TrimSpace(stdout), nil +} + +func consistentStandbyPassword(ctx context.Context, runner podExecRunner, pods []*corev1.Pod) (string, error) { + expectedPassword := "" + for _, pod := range pods { + password, err := standbyPasswordFromPod(ctx, runner, pod) + if err != nil { + return "", err + } + if expectedPassword == "" { + expectedPassword = password + continue + } + if password != expectedPassword { + return "", inconsistentStandbyPasswordError{} + } + } + return expectedPassword, nil +} + +func standbyPasswordFromPod(ctx context.Context, runner podExecRunner, pod *corev1.Pod) (string, error) { + stdout, stderr, err := runner.Exec(ctx, pod, []string{"cat", standbyPgpassPath}, "") + if err != nil { + return "", fmt.Errorf( + "postgresql standby password repair: read pgpass from pod %q: %w: %s", + pod.Name, + err, + strings.TrimSpace(stderr), + ) + } + password, err := parseStandbyPasswordFromPgpass(stdout) + if err == nil { + return password, nil + } + if !errors.Is(err, errStandbyEntryNotFound) { + return "", fmt.Errorf("postgresql standby password repair: parse pgpass from pod %q: %w", pod.Name, err) + } + + // Some PostgreSQL leaders keep only the superuser entry in pgpass, while the + // replication password is still exposed through the pod environment. + stdout, stderr, err = runner.Exec(ctx, pod, []string{"sh", "-c", readStandbyPasswordEnvCommand}, "") + if err != nil { + return "", fmt.Errorf( + "postgresql standby password repair: read standby env from pod %q: %w: %s", + pod.Name, + err, + strings.TrimSpace(stderr), + ) + } + password = strings.TrimRight(stdout, "\r\n") + if password == "" { + return "", fmt.Errorf( + "postgresql standby password repair: standby password not found in pod %q pgpass or env", + pod.Name, + ) + } + return password, nil +} + +func ensureLeaderStandbyPassword( + ctx context.Context, + runner podExecRunner, + leaderPod *corev1.Pod, + expectedPassword string, +) (bool, error) { + if strings.ContainsAny(expectedPassword, "\r\n") { + return false, fmt.Errorf("postgresql standby password repair: invalid standby password") + } + stdout, _, err := runner.Exec( + ctx, + leaderPod, + []string{"sh", "-c", ensureStandbyPasswordScript}, + expectedPassword, + ) + if err != nil { + return false, fmt.Errorf( + "postgresql standby password repair: ensure standby password on leader pod %q: %w", + leaderPod.Name, + err, + ) + } + switch strings.TrimSpace(stdout) { + case "t": + return false, nil + case "f": + return true, nil + default: + return false, fmt.Errorf( + "postgresql standby password repair: unexpected verification result from leader pod %q", + leaderPod.Name, + ) + } +} + +func parseStandbyPasswordFromPgpass(content string) (string, error) { + for _, rawLine := range strings.Split(content, "\n") { + line := strings.TrimRight(rawLine, "\r") + trimmed := strings.TrimSpace(line) + if trimmed == "" || strings.HasPrefix(trimmed, "#") { + continue + } + fields := splitPgpassLine(line) + if len(fields) < 5 { + continue + } + if fields[3] == standbyUserName { + if fields[4] == "" { + return "", fmt.Errorf("standby password is empty") + } + return fields[4], nil + } + } + return "", errStandbyEntryNotFound +} + +func splitPgpassLine(line string) []string { + fields := make([]string, 0, 5) + var field strings.Builder + escaped := false + for _, r := range line { + switch { + case escaped: + field.WriteRune(r) + escaped = false + case r == '\\': + escaped = true + case r == ':': + fields = append(fields, field.String()) + field.Reset() + default: + field.WriteRune(r) + } + } + if escaped { + field.WriteRune('\\') + } + fields = append(fields, field.String()) + return fields +} + +type inconsistentStandbyPasswordError struct{} + +func (inconsistentStandbyPasswordError) Error() string { + return "postgresql standby password repair: standby passwords differ across running pods" +} + +func isInconsistentStandbyPasswordError(err error) bool { + var target inconsistentStandbyPasswordError + return errors.As(err, &target) +} + +type podExecRunner interface { + Exec(ctx context.Context, pod *corev1.Pod, command []string, stdin string) (stdout string, stderr string, err error) +} + +type kubePodExecRunner struct { + restConfig *rest.Config +} + +func newKubePodExecRunner(restConfig *rest.Config) *kubePodExecRunner { + return &kubePodExecRunner{restConfig: restConfig} +} + +func (r *kubePodExecRunner) Exec( + ctx context.Context, + pod *corev1.Pod, + command []string, + stdin string, +) (string, string, error) { + if pod == nil { + return "", "", fmt.Errorf("pod is nil") + } + containerName := postgreSQLContainerName(pod) + if containerName == "" { + return "", "", fmt.Errorf("pod %q has no container", pod.Name) + } + + restConfig, err := r.config() + if err != nil { + return "", "", err + } + restClient, err := rest.RESTClientFor(restConfig) + if err != nil { + return "", "", fmt.Errorf("create kubernetes rest client: %w", err) + } + req := restClient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec") + req.VersionedParams(&corev1.PodExecOptions{ + Container: containerName, + Command: command, + Stdin: stdin != "", + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + executor, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL()) + if err != nil { + return "", "", fmt.Errorf("create kubernetes exec executor: %w", err) + } + var stdout, stderr bytes.Buffer + var stdinReader io.Reader + if stdin != "" { + stdinReader = strings.NewReader(stdin) + } + err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdinReader, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + return stdout.String(), stderr.String(), err +} + +func (r *kubePodExecRunner) config() (*rest.Config, error) { + var ( + config *rest.Config + err error + ) + if r.restConfig != nil { + config = rest.CopyConfig(r.restConfig) + } else { + config, err = ctrl.GetConfig() + if err != nil { + return nil, fmt.Errorf("get kubernetes rest config: %w", err) + } + } + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + config.APIPath = "/api" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + return config, nil +} + +func postgreSQLContainerName(pod *corev1.Pod) string { + preferredNames := map[string]struct{}{ + "postgresql": {}, + "postgres": {}, + "spilo": {}, + } + for _, container := range pod.Spec.Containers { + if _, ok := preferredNames[strings.ToLower(container.Name)]; ok { + return container.Name + } + } + for _, container := range pod.Spec.Containers { + name := strings.ToLower(container.Name) + if strings.Contains(name, "lorry") || strings.Contains(name, "config") { + continue + } + return container.Name + } + if len(pod.Spec.Containers) == 0 { + return "" + } + return pod.Spec.Containers[0].Name +} + +var _ podExecRunner = &kubePodExecRunner{} diff --git a/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go b/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go new file mode 100644 index 00000000000..78c6c6dfdd3 --- /dev/null +++ b/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go @@ -0,0 +1,521 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +func TestParseStandbyPasswordFromPgpass(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + content string + want string + wantErr bool + }{ + { + name: "standby entry", + content: strings.Join([]string{ + "# comment", + "localhost:5432:*:postgres:root", + "leader:5432:*:standby:secret", + }, "\n"), + want: "secret", + }, + { + name: "escaped delimiters", + content: `leader\:0:5432:*:standby:se\:cret\\`, + want: `se:cret\`, + }, + { + name: "preserve password spaces", + content: "leader:5432:*:standby: secret ", + want: " secret ", + }, + { + name: "missing standby", + content: "localhost:5432:*:postgres:root", + wantErr: true, + }, + { + name: "empty standby password", + content: "localhost:5432:*:standby:", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseStandbyPasswordFromPgpass(tt.content) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.want, got) + }) + } +} + +func TestConsistentStandbyPassword(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + "postgresql-0": "localhost:5432:*:standby:secret", + "postgresql-1": "localhost:5432:*:standby:secret", + }, + } + + password, err := consistentStandbyPassword(context.Background(), runner, pods) + + require.NoError(t, err) + require.Equal(t, "secret", password) + require.Equal(t, [][]string{ + {"cat", standbyPgpassPath}, + {"cat", standbyPgpassPath}, + }, runner.commands) +} + +func TestConsistentStandbyPasswordFallsBackToEnv(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + "postgresql-0": "localhost:5432:*:postgres:secret", + "postgresql-1": "localhost:5432:*:standby:secret", + }, + standbyEnv: map[string]string{ + "postgresql-0": "secret", + }, + } + + password, err := consistentStandbyPassword(context.Background(), runner, pods) + + require.NoError(t, err) + require.Equal(t, "secret", password) + require.Equal(t, [][]string{ + {"cat", standbyPgpassPath}, + {"sh", "-c", readStandbyPasswordEnvCommand}, + {"cat", standbyPgpassPath}, + }, runner.commands) +} + +func TestConsistentStandbyPasswordInconsistent(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + "postgresql-0": "localhost:5432:*:standby:secret-a", + "postgresql-1": "localhost:5432:*:standby:secret-b", + }, + } + + _, err := consistentStandbyPassword(context.Background(), runner, pods) + + require.Error(t, err) + require.True(t, isInconsistentStandbyPasswordError(err)) +} + +func TestShouldSkipStandbyPasswordRepair(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgMode: map[string]string{ + "postgresql-0": "", + "postgresql-1": "standby", + }, + } + + skip, err := shouldSkipStandbyPasswordRepair(context.Background(), runner, pods) + + require.NoError(t, err) + require.True(t, skip) + require.Equal(t, [][]string{ + {"sh", "-c", readPostgreSQLModeEnvCommand}, + {"sh", "-c", readPostgreSQLModeEnvCommand}, + }, runner.commands) +} + +func TestEnsureLeaderStandbyPassword(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "f"} + + repaired, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "secret") + + require.NoError(t, err) + require.True(t, repaired) + require.Equal(t, "secret", runner.stdin) + require.Equal(t, []string{"sh", "-c", ensureStandbyPasswordScript}, runner.lastCommand) +} + +func TestEnsureLeaderStandbyPasswordNoop(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "t"} + + repaired, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "secret") + + require.NoError(t, err) + require.False(t, repaired) +} + +func TestEnsureLeaderStandbyPasswordRejectsNewline(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "t"} + + _, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "sec\nret") + + require.Error(t, err) + require.Empty(t, runner.lastCommand) +} + +func TestComponentPostgreSQLStandbyPasswordRepairTransformer(t *testing.T) { + t.Parallel() + + const ( + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + transCtx := newPostgreSQLStandbyPasswordRepairTestContext(t, leaderPodName, replicaPod) + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + leaderPodName: "localhost:5432:*:postgres:secret", + replicaPod: "localhost:5432:*:standby:secret", + }, + standbyEnv: map[string]string{ + leaderPodName: "secret", + }, + ensureResult: "f", + } + + transformer := &componentPostgreSQLStandbyPasswordRepairTransformer{execRunner: runner} + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.Equal(t, leaderPodName, runner.ensurePod) + require.Equal(t, "secret", runner.stdin) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) +} + +func TestComponentPostgreSQLStandbyPasswordRepairConditionUsesStatusVertex(t *testing.T) { + t.Parallel() + + const ( + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + transCtx := newPostgreSQLStandbyPasswordRepairTestContext(t, leaderPodName, replicaPod) + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + leaderPodName: "localhost:5432:*:postgres:secret", + replicaPod: "localhost:5432:*:standby:secret", + }, + standbyEnv: map[string]string{ + leaderPodName: "secret", + }, + ensureResult: "f", + } + dag := graph.NewDAG() + + require.NoError(t, (&componentInitTransformer{}).Transform(transCtx, dag)) + require.NoError(t, (&componentStatusTransformer{}).Transform(transCtx, dag)) + require.Equal(t, appsv1alpha1.RunningClusterCompPhase, transCtx.Component.Status.Phase) + require.NoError(t, (&componentPostgreSQLStandbyPasswordRepairTransformer{ + execRunner: runner, + }).Transform(transCtx, dag)) + + require.Equal(t, leaderPodName, runner.ensurePod) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) + + graphClient := transCtx.Client.(model.GraphClient) + vertex := graphClient.FindMatchedVertex(dag, transCtx.Component) + require.NotNil(t, vertex) + objectVertex := vertex.(*model.ObjectVertex) + require.Equal(t, model.STATUS, *objectVertex.Action) + component := objectVertex.Obj.(*appsv1alpha1.Component) + cond = meta.FindStatusCondition(component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) +} + +func TestComponentPostgreSQLStandbyPasswordRepairTransformerSkipsStandbyMode(t *testing.T) { + t.Parallel() + + const ( + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + transCtx := newPostgreSQLStandbyPasswordRepairTestContext(t, leaderPodName, replicaPod) + runner := &fakePodExecRunner{ + pgMode: map[string]string{ + leaderPodName: "standby", + replicaPod: "standby", + }, + } + + transformer := &componentPostgreSQLStandbyPasswordRepairTransformer{execRunner: runner} + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.Empty(t, runner.ensurePod) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) + require.Equal(t, standbyPasswordRepairReasonSkipped, cond.Reason) +} + +func TestComponentPostgreSQLStandbyPasswordRepairTransformerInconsistent(t *testing.T) { + t.Parallel() + + const ( + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + transCtx := newPostgreSQLStandbyPasswordRepairTestContext(t, leaderPodName, replicaPod) + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + leaderPodName: "localhost:5432:*:standby:secret-a", + replicaPod: "localhost:5432:*:standby:secret-b", + }, + } + + transformer := &componentPostgreSQLStandbyPasswordRepairTransformer{execRunner: runner} + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.Empty(t, runner.ensurePod) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionFalse, cond.Status) + require.Equal(t, standbyPasswordRepairReasonInconsistent, cond.Reason) +} + +func newPostgreSQLStandbyPasswordRepairTestContext( + t *testing.T, + leaderPodName string, + replicaPodNames ...string, +) *componentTransformContext { + t.Helper() + + const ( + namespace = "default" + clusterName = "test" + componentName = "postgresql" + compName = "test-postgresql" + ) + + cluster := &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: clusterName, Namespace: namespace}, + } + cluster.Generation = 1 + comp := &appsv1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{Name: compName, Namespace: namespace}, + Status: appsv1alpha1.ComponentStatus{ + ObservedGeneration: 1, + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + comp.Generation = 1 + compDef := &appsv1alpha1.ComponentDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "postgresql"}, + Spec: appsv1alpha1.ComponentDefinitionSpec{ + ServiceKind: "PostgreSQL", + }, + } + labels := constant.GetComponentWellKnownLabels(clusterName, componentName) + objects := []client.Object{ + cluster, + comp, + compDef, + postgresqlTestPod(namespace, leaderPodName, labels), + } + for _, podName := range replicaPodNames { + objects = append(objects, postgresqlTestPod(namespace, podName, labels)) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(rscheme). + WithObjects(objects...). + Build() + replicas := int32(len(objects) - 3) + membersStatus := []workloads.MemberStatus{{ + PodName: leaderPodName, + ReplicaRole: &workloads.ReplicaRole{ + Name: "leader", + IsLeader: true, + }, + }} + for _, podName := range replicaPodNames { + membersStatus = append(membersStatus, workloads.MemberStatus{ + PodName: podName, + ReplicaRole: &workloads.ReplicaRole{Name: "replica"}, + }) + } + runningITS := &workloads.InstanceSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: compName, + Annotations: map[string]string{ + constant.KubeBlocksGenerationKey: "1", + }, + }, + Spec: workloads.InstanceSetSpec{ + Replicas: &replicas, + Roles: []workloads.ReplicaRole{{ + Name: "leader", + IsLeader: true, + }}, + }, + Status: workloads.InstanceSetStatus{ + ObservedGeneration: 1, + Replicas: replicas, + ReadyReplicas: replicas, + AvailableReplicas: replicas, + UpdatedReplicas: replicas, + InitReplicas: replicas, + ReadyInitReplicas: replicas, + CurrentRevision: "revision-1", + UpdateRevision: "revision-1", + MembersStatus: membersStatus, + }, + } + runningITS.Generation = 1 + return &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(k8sClient), + Logger: log.Log, + Cluster: cluster, + CompDef: compDef, + Component: comp.DeepCopy(), + ComponentOrig: comp.DeepCopy(), + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: namespace, + ClusterName: clusterName, + Name: componentName, + Replicas: replicas, + }, + RunningWorkload: runningITS, + ProtoWorkload: runningITS.DeepCopy(), + } +} + +func postgresqlTestPod(namespace, name string, labels map[string]string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "postgresql", + }}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "10.0.0.10", + }, + } +} + +type fakePodExecRunner struct { + pgpass map[string]string + pgMode map[string]string + standbyEnv map[string]string + ensureResult string + ensurePod string + stdin string + lastCommand []string + commands [][]string +} + +func (r *fakePodExecRunner) Exec( + _ context.Context, + pod *corev1.Pod, + command []string, + stdin string, +) (string, string, error) { + r.commands = append(r.commands, append([]string{}, command...)) + r.lastCommand = append([]string{}, command...) + if len(command) == 2 && command[0] == "cat" && command[1] == standbyPgpassPath { + return r.pgpass[pod.Name], "", nil + } + if len(command) == 3 && command[0] == "sh" && command[1] == "-c" && + command[2] == readStandbyPasswordEnvCommand { + return r.standbyEnv[pod.Name], "", nil + } + if len(command) == 3 && command[0] == "sh" && command[1] == "-c" && + command[2] == readPostgreSQLModeEnvCommand { + return r.pgMode[pod.Name], "", nil + } + r.ensurePod = pod.Name + r.stdin = stdin + return r.ensureResult, "", nil +} + +var _ podExecRunner = &fakePodExecRunner{}