From 2fd54e1df606375e2d3354bcc9fcc7f5c436573c Mon Sep 17 00:00:00 2001 From: im0x0ing Date: Tue, 26 May 2026 16:58:13 +0800 Subject: [PATCH 1/3] fix: repair PostgreSQL Patroni pg_hba DCS drift Add a Component reconcile transformer for PostgreSQL Patroni clusters that repairs missing pg_hba rules in Patroni dynamic config after failover. The repair finds the current InstanceSet leader, reads the rendered pg_hba.conf ConfigMap when available, falls back to the required remote client/replication rules, patches Patroni /config, reloads Patroni, and verifies the resulting dynamic config. Also watch Pod changes so leader/failover state changes can retrigger Component reconciliation. Tests: - go test ./controllers/apps -run 'Test(ParsePgHBAContent|MergePgHBARules|RepairPatroniPgHBA|RepairPatroniPgHBANoop|RepairPatroniPgHBAReloadsAfterPreviousFailure|EnsurePgHBARemoteRules|HTTPPatroniConfigClient|ComponentPatroniDCSRepairTransformer|ComponentPatroniDCSRepairTransformerFallbackPgHBA)$' -count=1 - go test ./controllers/apps -run 'Test(ParsePgHBAContent|MergePgHBARules|RepairPatroniPgHBA|RepairPatroniPgHBANoop|RepairPatroniPgHBAReloadsAfterPreviousFailure|EnsurePgHBARemoteRules|HTTPPatroniConfigClient|ComponentPatroniDCSRepairTransformer|ComponentPatroniDCSRepairTransformerFallbackPgHBA)$' -race -count=1 --- controllers/apps/component_controller.go | 4 + ...ransformer_component_patroni_dcs_repair.go | 482 ++++++++++++++++++ ...ormer_component_patroni_dcs_repair_test.go | 388 ++++++++++++++ 3 files changed, 874 insertions(+) create mode 100644 controllers/apps/transformer_component_patroni_dcs_repair.go create mode 100644 controllers/apps/transformer_component_patroni_dcs_repair_test.go diff --git a/controllers/apps/component_controller.go b/controllers/apps/component_controller.go index cae17b2e7aa..992a35ccbfa 100644 --- a/controllers/apps/component_controller.go +++ b/controllers/apps/component_controller.go @@ -185,6 +185,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( &componentPostProvisionTransformer{}, // update component status &componentStatusTransformer{Client: r.Client}, + // repair PostgreSQL Patroni dynamic config after failover + &componentPatroniDCSRepairTransformer{}, ).Build() // Execute stage @@ -225,6 +227,7 @@ func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error { Owns(&dpv1alpha1.Restore{}). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). Watches(&corev1.PersistentVolumeClaim{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). + Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)). Owns(&batchv1.Job{}). Watches(&appsv1alpha1.Configuration{}, handler.EnqueueRequestsFromMapFunc(r.configurationEventHandler)) @@ -257,6 +260,7 @@ func (r *ComponentReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, mul Watch(b, &corev1.Secret{}, eventHandler). Watch(b, &corev1.ConfigMap{}, eventHandler). Watch(b, &corev1.PersistentVolumeClaim{}, eventHandler). + Watch(b, &corev1.Pod{}, eventHandler). Watch(b, &batchv1.Job{}, eventHandler). Watch(b, &corev1.ServiceAccount{}, eventHandler). Watch(b, &rbacv1.RoleBinding{}, eventHandler). diff --git a/controllers/apps/transformer_component_patroni_dcs_repair.go b/controllers/apps/transformer_component_patroni_dcs_repair.go new file mode 100644 index 00000000000..fde77f2fd10 --- /dev/null +++ b/controllers/apps/transformer_component_patroni_dcs_repair.go @@ -0,0 +1,482 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core" + "github.com/apecloud/kubeblocks/pkg/constant" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" +) + +const ( + patroniDCSRepairConditionType = "PatroniDCSRepair" + + patroniDCSRepairReasonSucceeded = "Succeeded" + patroniDCSRepairReasonFailed = "Failed" + + patroniDCSRepairRequeueInterval = time.Minute + patroniHTTPTimeout = 10 * time.Second + patroniMaxResponseBytes = 1 << 20 + patroniDefaultRESTPort = int32(8008) + + pgHBAConfigFile = "pg_hba.conf" +) + +var ( + requiredPatroniPgHBARules = []string{ + "host all all 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + "host replication all 127.0.0.1/32 md5", + } + fallbackPatroniPgHBARules = append([]string{"local all all trust"}, requiredPatroniPgHBARules...) +) + +// componentPatroniDCSRepairTransformer repairs PostgreSQL Patroni dynamic +// config that may miss pg_hba rules after failover. +type componentPatroniDCSRepairTransformer struct { + patroniClient patroniConfigClient +} + +var _ graph.Transformer = &componentPatroniDCSRepairTransformer{} + +func (t *componentPatroniDCSRepairTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error { + transCtx, _ := ctx.(*componentTransformContext) + if model.IsObjectDeleting(transCtx.ComponentOrig) { + return nil + } + if !isPostgreSQLComponent(transCtx) { + return nil + } + if transCtx.Component.Status.Phase != appsv1alpha1.RunningClusterCompPhase { + return nil + } + + runningITS, ok := transCtx.RunningWorkload.(*workloads.InstanceSet) + if !ok || runningITS == nil || runningITS.Status.AvailableReplicas == 0 { + return nil + } + + leaderPod, err := t.getLeaderPod(transCtx, runningITS) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + if leaderPod == nil || leaderPod.Status.PodIP == "" { + err := fmt.Errorf("postgresql patroni dcs repair: leader pod %q has no pod ip", leaderMemberPodName(runningITS)) + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + expectedPgHBA, err := t.expectedPgHBARules(transCtx) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + patroniClient := t.patroniClient + if patroniClient == nil { + patroniClient = newHTTPPatroniConfigClient() + } + repaired, err := repairPatroniPgHBA( + transCtx.Context, + patroniClient, + patroniRESTURL(leaderPod.Status.PodIP, patroniRESTPort(leaderPod)), + expectedPgHBA, + previousPatroniDCSRepairFailed(transCtx.Component.Status.Conditions), + ) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } + + t.markRepairSucceeded(transCtx, repaired) + if repaired && transCtx.EventRecorder != nil { + transCtx.EventRecorder.Event(transCtx.Component, corev1.EventTypeNormal, + patroniDCSRepairConditionType, "repaired PostgreSQL Patroni DCS pg_hba rules") + } + return nil +} + +func isPostgreSQLComponent(transCtx *componentTransformContext) bool { + if transCtx == nil || transCtx.CompDef == nil { + return false + } + serviceKind := strings.ToLower(strings.TrimSpace(transCtx.CompDef.Spec.ServiceKind)) + for _, alias := range constant.GetPostgreSQLAlias() { + if serviceKind == alias { + return true + } + } + compDefName := strings.ToLower(transCtx.CompDef.Name) + return compDefName == constant.ServiceKindPostgreSQL || strings.HasPrefix(compDefName, constant.ServiceKindPostgreSQL+"-") +} + +func (t *componentPatroniDCSRepairTransformer) getLeaderPod( + transCtx *componentTransformContext, + runningITS *workloads.InstanceSet, +) (*corev1.Pod, error) { + podName := leaderMemberPodName(runningITS) + if podName == "" { + return nil, fmt.Errorf("postgresql patroni dcs repair: leader pod not found") + } + + pod := &corev1.Pod{} + podKey := types.NamespacedName{Namespace: transCtx.Component.Namespace, Name: podName} + if err := transCtx.Client.Get(transCtx.Context, podKey, pod, inDataContext4C()); err != nil { + return nil, fmt.Errorf("postgresql patroni dcs repair: get leader pod %q: %w", podName, err) + } + return pod, nil +} + +func leaderMemberPodName(runningITS *workloads.InstanceSet) string { + if runningITS == nil { + return "" + } + for _, member := range runningITS.Status.MembersStatus { + if member.ReplicaRole != nil && member.ReplicaRole.IsLeader { + return member.PodName + } + } + return "" +} + +func (t *componentPatroniDCSRepairTransformer) expectedPgHBARules(transCtx *componentTransformContext) ([]string, error) { + for _, configSpec := range transCtx.SynthesizeComponent.ConfigTemplates { + cmKey := types.NamespacedName{ + Namespace: transCtx.Cluster.Namespace, + Name: cfgcore.GetComponentCfgName( + transCtx.Cluster.Name, + transCtx.SynthesizeComponent.Name, + configSpec.Name, + ), + } + cmObj := &corev1.ConfigMap{} + if err := transCtx.Client.Get(transCtx.Context, cmKey, cmObj, inDataContext4C()); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return nil, fmt.Errorf("postgresql patroni dcs repair: get configmap %q: %w", cmKey.Name, err) + } + if content, ok := cmObj.Data[pgHBAConfigFile]; ok { + rules := parsePgHBAContent(content) + if len(rules) > 0 { + return ensurePgHBARemoteRules(rules), nil + } + } + } + return append([]string{}, fallbackPatroniPgHBARules...), nil +} + +func (t *componentPatroniDCSRepairTransformer) markRepairSucceeded( + transCtx *componentTransformContext, + repaired bool, +) { + message := "PostgreSQL Patroni DCS pg_hba rules are up to date" + if repaired { + message = "PostgreSQL Patroni DCS pg_hba rules were repaired" + } + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: patroniDCSRepairConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: patroniDCSRepairReasonSucceeded, + Message: message, + }) +} + +func (t *componentPatroniDCSRepairTransformer) markRepairFailed( + transCtx *componentTransformContext, + err error, +) { + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: patroniDCSRepairConditionType, + Status: metav1.ConditionFalse, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: patroniDCSRepairReasonFailed, + Message: err.Error(), + }) +} + +func previousPatroniDCSRepairFailed(conditions []metav1.Condition) bool { + condition := meta.FindStatusCondition(conditions, patroniDCSRepairConditionType) + return condition != nil && condition.Status == metav1.ConditionFalse +} + +func parsePgHBAContent(content string) []string { + rules := make([]string, 0) + for _, rawLine := range strings.Split(content, "\n") { + line := strings.TrimSpace(rawLine) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if commentIndex := strings.Index(line, "#"); commentIndex >= 0 { + line = strings.TrimSpace(line[:commentIndex]) + } + if line == "" { + continue + } + rules = append(rules, strings.Join(strings.Fields(line), " ")) + } + return rules +} + +func ensurePgHBARemoteRules(rules []string) []string { + rules, _ = mergePgHBARules(rules, requiredPatroniPgHBARules) + return rules +} + +func repairPatroniPgHBA( + ctx context.Context, + patroniClient patroniConfigClient, + baseURL string, + expectedRules []string, + reloadIfUnchanged bool, +) (bool, error) { + config, err := patroniClient.GetConfig(ctx, baseURL) + if err != nil { + return false, err + } + patchedRules, changed := mergePgHBARules(config.PostgreSQL.PgHBA, expectedRules) + if !changed { + if reloadIfUnchanged { + if err := patroniClient.Reload(ctx, baseURL); err != nil { + return false, err + } + } + return false, nil + } + + patch := patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: patchedRules, + }, + } + if err := patroniClient.PatchConfig(ctx, baseURL, patch); err != nil { + return false, err + } + if err := patroniClient.Reload(ctx, baseURL); err != nil { + return false, err + } + + config, err = patroniClient.GetConfig(ctx, baseURL) + if err != nil { + return false, err + } + if missing := missingPgHBARules(config.PostgreSQL.PgHBA, expectedRules); len(missing) > 0 { + return false, fmt.Errorf("postgresql patroni dcs repair: pg_hba rules still missing: %s", strings.Join(missing, "; ")) + } + return true, nil +} + +func mergePgHBARules(currentRules, expectedRules []string) ([]string, bool) { + merged := normalizePgHBARules(currentRules) + existing := make(map[string]struct{}, len(merged)) + for _, rule := range merged { + existing[rule] = struct{}{} + } + + changed := false + for _, rule := range normalizePgHBARules(expectedRules) { + if _, ok := existing[rule]; ok { + continue + } + merged = append(merged, rule) + existing[rule] = struct{}{} + changed = true + } + return merged, changed +} + +func missingPgHBARules(currentRules, expectedRules []string) []string { + current := normalizePgHBARules(currentRules) + currentSet := make(map[string]struct{}, len(current)) + for _, rule := range current { + currentSet[rule] = struct{}{} + } + + missing := make([]string, 0) + for _, rule := range normalizePgHBARules(expectedRules) { + if _, ok := currentSet[rule]; !ok { + missing = append(missing, rule) + } + } + return missing +} + +func normalizePgHBARules(rules []string) []string { + normalized := make([]string, 0, len(rules)) + seen := map[string]struct{}{} + for _, rule := range rules { + rule = strings.Join(strings.Fields(strings.TrimSpace(rule)), " ") + if rule == "" { + continue + } + if _, ok := seen[rule]; ok { + continue + } + seen[rule] = struct{}{} + normalized = append(normalized, rule) + } + return normalized +} + +func patroniRESTURL(podIP string, port int32) string { + return "http://" + net.JoinHostPort(podIP, strconv.Itoa(int(port))) +} + +func patroniRESTPort(pod *corev1.Pod) int32 { + if pod == nil { + return patroniDefaultRESTPort + } + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + name := strings.ToLower(port.Name) + if name == "patroni" || name == "patroni-rest" || + name == "patroni-restapi" || name == "patroni-api" || name == "restapi" { + return port.ContainerPort + } + } + } + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.ContainerPort == patroniDefaultRESTPort { + return port.ContainerPort + } + } + } + return patroniDefaultRESTPort +} + +type patroniConfigClient interface { + GetConfig(ctx context.Context, baseURL string) (*patroniDynamicConfig, error) + PatchConfig(ctx context.Context, baseURL string, config patroniDynamicConfig) error + Reload(ctx context.Context, baseURL string) error +} + +type patroniDynamicConfig struct { + PostgreSQL patroniPostgreSQLConfig `json:"postgresql,omitempty"` +} + +type patroniPostgreSQLConfig struct { + PgHBA []string `json:"pg_hba,omitempty"` +} + +type httpPatroniConfigClient struct { + client *http.Client +} + +func newHTTPPatroniConfigClient() *httpPatroniConfigClient { + return &httpPatroniConfigClient{ + client: &http.Client{Timeout: patroniHTTPTimeout}, + } +} + +func (c *httpPatroniConfigClient) GetConfig(ctx context.Context, baseURL string) (*patroniDynamicConfig, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, patroniURL(baseURL, "config"), nil) + if err != nil { + return nil, fmt.Errorf("postgresql patroni dcs repair: build get config request: %w", err) + } + + var config patroniDynamicConfig + if err := c.doJSON(req, &config); err != nil { + return nil, err + } + return &config, nil +} + +func (c *httpPatroniConfigClient) PatchConfig( + ctx context.Context, + baseURL string, + config patroniDynamicConfig, +) error { + body, err := json.Marshal(config) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: marshal patch config request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, patroniURL(baseURL, "config"), bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: build patch config request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + return c.do(req, nil) +} + +func (c *httpPatroniConfigClient) Reload(ctx context.Context, baseURL string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, patroniURL(baseURL, "reload"), nil) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: build reload request: %w", err) + } + return c.do(req, nil) +} + +func (c *httpPatroniConfigClient) doJSON(req *http.Request, out any) error { + req.Header.Set("Accept", "application/json") + return c.do(req, out) +} + +func (c *httpPatroniConfigClient) do(req *http.Request, out any) error { + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: send %s %s: %w", req.Method, req.URL.Path, err) + } + defer resp.Body.Close() + + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + return fmt.Errorf("postgresql patroni dcs repair: %s %s returned %s", req.Method, req.URL.Path, resp.Status) + } + if out == nil { + _, err = io.Copy(io.Discard, io.LimitReader(resp.Body, patroniMaxResponseBytes)) + if err != nil { + return fmt.Errorf("postgresql patroni dcs repair: read %s %s response: %w", req.Method, req.URL.Path, err) + } + return nil + } + if err := json.NewDecoder(io.LimitReader(resp.Body, patroniMaxResponseBytes)).Decode(out); err != nil { + return fmt.Errorf("postgresql patroni dcs repair: decode %s %s response: %w", req.Method, req.URL.Path, err) + } + return nil +} + +func patroniURL(baseURL, path string) string { + return strings.TrimRight(baseURL, "/") + "/" + strings.TrimLeft(path, "/") +} diff --git a/controllers/apps/transformer_component_patroni_dcs_repair_test.go b/controllers/apps/transformer_component_patroni_dcs_repair_test.go new file mode 100644 index 00000000000..7e82ba5cb22 --- /dev/null +++ b/controllers/apps/transformer_component_patroni_dcs_repair_test.go @@ -0,0 +1,388 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +func TestParsePgHBAContent(t *testing.T) { + t.Parallel() + + content := ` +# comment +local all all trust +host all all 127.0.0.1/32 md5 # inline comment + +host replication standby 0.0.0.0/0 md5 +` + require.Equal(t, []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication standby 0.0.0.0/0 md5", + }, parsePgHBAContent(content)) +} + +func TestMergePgHBARules(t *testing.T) { + t.Parallel() + + current := []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + } + expected := []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + } + + merged, changed := mergePgHBARules(current, expected) + require.True(t, changed) + require.Equal(t, []string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication standby 0.0.0.0/0 md5", + }, merged) + require.Empty(t, missingPgHBARules(merged, expected)) +} + +func TestRepairPatroniPgHBA(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, false) + + require.NoError(t, err) + require.True(t, repaired) + require.True(t, client.patched) + require.True(t, client.reloaded) + require.Equal(t, []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, client.config.PostgreSQL.PgHBA) +} + +func TestRepairPatroniPgHBANoop(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{ + "local all all trust", + "host replication standby 0.0.0.0/0 md5", + }, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "host replication standby 0.0.0.0/0 md5", + }, false) + + require.NoError(t, err) + require.False(t, repaired) + require.False(t, client.patched) + require.False(t, client.reloaded) +} + +func TestRepairPatroniPgHBAReloadsAfterPreviousFailure(t *testing.T) { + t.Parallel() + + client := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"host replication standby 0.0.0.0/0 md5"}, + }, + }, + } + repaired, err := repairPatroniPgHBA(context.Background(), client, "http://pod:8008", []string{ + "host replication standby 0.0.0.0/0 md5", + }, true) + + require.NoError(t, err) + require.False(t, repaired) + require.False(t, client.patched) + require.True(t, client.reloaded) +} + +func TestEnsurePgHBARemoteRules(t *testing.T) { + t.Parallel() + + rules := ensurePgHBARemoteRules([]string{ + "local all all trust", + "host all all 127.0.0.1/32 md5", + "host replication all 127.0.0.1/32 md5", + }) + + require.Contains(t, rules, "host all all 0.0.0.0/0 md5") + require.Contains(t, rules, "host replication standby 0.0.0.0/0 md5") + require.Contains(t, rules, "host replication all 127.0.0.1/32 md5") +} + +func TestHTTPPatroniConfigClient(t *testing.T) { + t.Parallel() + + requests := make([]string, 0) + config := patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests = append(requests, r.Method+" "+r.URL.Path) + switch { + case r.Method == http.MethodGet && r.URL.Path == "/config": + require.NoError(t, json.NewEncoder(w).Encode(config)) + case r.Method == http.MethodPatch && r.URL.Path == "/config": + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + require.NoError(t, json.NewDecoder(r.Body).Decode(&config)) + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodPost && r.URL.Path == "/reload": + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + client := &httpPatroniConfigClient{client: server.Client()} + got, err := client.GetConfig(context.Background(), server.URL) + require.NoError(t, err) + require.Equal(t, []string{"local all all trust"}, got.PostgreSQL.PgHBA) + + err = client.PatchConfig(context.Background(), server.URL, patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"host all all 0.0.0.0/0 md5"}, + }, + }) + require.NoError(t, err) + require.NoError(t, client.Reload(context.Background(), server.URL)) + require.Equal(t, []string{ + "GET /config", + "PATCH /config", + "POST /reload", + }, requests) + require.Equal(t, []string{"host all all 0.0.0.0/0 md5"}, config.PostgreSQL.PgHBA) +} + +func TestComponentPatroniDCSRepairTransformer(t *testing.T) { + t.Parallel() + + const ( + namespace = "default" + clusterName = "test" + componentName = "postgresql" + compName = "test-postgresql" + configName = "postgresql-configuration" + leaderPodName = "test-postgresql-1" + ) + + scheme := rscheme + cluster := &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + } + comp := &appsv1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{ + Name: compName, + Namespace: namespace, + }, + Status: appsv1alpha1.ComponentStatus{ + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + compDef := &appsv1alpha1.ComponentDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "postgresql"}, + Spec: appsv1alpha1.ComponentDefinitionSpec{ + ServiceKind: "PostgreSQL", + }, + } + leaderPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaderPodName, + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "postgresql", + Ports: []corev1.ContainerPort{{ + Name: "patroni", + ContainerPort: 8010, + }}, + }}, + }, + Status: corev1.PodStatus{PodIP: "10.0.0.10"}, + } + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cfgcore.GetComponentCfgName(clusterName, componentName, configName), + Namespace: namespace, + }, + Data: map[string]string{ + pgHBAConfigFile: strings.Join([]string{ + "local all all trust", + "host all all 0.0.0.0/0 md5", + "host replication standby 0.0.0.0/0 md5", + }, "\n"), + }, + } + runningITS := &workloads.InstanceSet{ + Status: workloads.InstanceSetStatus{ + AvailableReplicas: 2, + MembersStatus: []workloads.MemberStatus{{ + PodName: leaderPodName, + ReplicaRole: &workloads.ReplicaRole{ + Name: "leader", + IsLeader: true, + }, + }}, + }, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(cluster, comp, compDef, leaderPod, configMap). + Build() + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(k8sClient), + Logger: log.Log, + Cluster: cluster, + CompDef: compDef, + Component: comp.DeepCopy(), + ComponentOrig: comp.DeepCopy(), + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: namespace, + ClusterName: clusterName, + Name: componentName, + ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ + ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{ + Name: configName, + }, + }}, + }, + RunningWorkload: runningITS, + } + patroniClient := &fakePatroniConfigClient{ + config: patroniDynamicConfig{ + PostgreSQL: patroniPostgreSQLConfig{ + PgHBA: []string{"local all all trust"}, + }, + }, + } + + transformer := &componentPatroniDCSRepairTransformer{ + patroniClient: patroniClient, + } + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.True(t, patroniClient.patched) + require.True(t, patroniClient.reloaded) + require.Equal(t, "http://10.0.0.10:8010", patroniClient.baseURL) + require.Empty(t, missingPgHBARules(patroniClient.config.PostgreSQL.PgHBA, parsePgHBAContent(configMap.Data[pgHBAConfigFile]))) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, patroniDCSRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) +} + +func TestComponentPatroniDCSRepairTransformerFallbackPgHBA(t *testing.T) { + t.Parallel() + + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(fake.NewClientBuilder().WithScheme(rscheme).Build()), + Cluster: &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: "default", + ClusterName: "test", + Name: "postgresql", + ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{Name: "postgresql-configuration"}}}, + }, + } + + rules, err := (&componentPatroniDCSRepairTransformer{}).expectedPgHBARules(transCtx) + + require.NoError(t, err) + require.Equal(t, fallbackPatroniPgHBARules, rules) +} + +type fakePatroniConfigClient struct { + config patroniDynamicConfig + baseURL string + patched bool + reloaded bool +} + +func (c *fakePatroniConfigClient) GetConfig(_ context.Context, baseURL string) (*patroniDynamicConfig, error) { + c.baseURL = baseURL + config := c.config + return &config, nil +} + +func (c *fakePatroniConfigClient) PatchConfig( + _ context.Context, + baseURL string, + config patroniDynamicConfig, +) error { + c.baseURL = baseURL + c.patched = true + c.config = config + return nil +} + +func (c *fakePatroniConfigClient) Reload(_ context.Context, baseURL string) error { + c.baseURL = baseURL + c.reloaded = true + return nil +} + +var _ patroniConfigClient = &fakePatroniConfigClient{} From 90290667885eabb3392794619386a7602eb74132 Mon Sep 17 00:00:00 2001 From: im0x0ing Date: Wed, 27 May 2026 10:32:47 +0800 Subject: [PATCH 2/3] fix: repair PostgreSQL standby password drift Add a Component reconcile transformer for PostgreSQL clusters that repairs drift between the standby password stored in pod pgpass files and the password stored in the PostgreSQL leader after failover. The repair lists running component Pods, reads each Pod's /run/postgresql/pgpass entry for the standby user, stops automatic repair when pod passwords differ, finds the current InstanceSet leader, compares the expected md5 password hash in pg_authid, updates the standby role password when needed, and verifies the result. Tests: - go test ./controllers/apps -run 'Test(ParseStandbyPasswordFromPgpass|ConsistentStandbyPassword|ConsistentStandbyPasswordInconsistent|EnsureLeaderStandbyPassword|EnsureLeaderStandbyPasswordNoop|EnsureLeaderStandbyPasswordRejectsNewline|ComponentPostgreSQLStandbyPasswordRepairTransformer|ComponentPostgreSQLStandbyPasswordRepairTransformerInconsistent)$' -count=1 - go test ./controllers/apps -run 'Test(ParseStandbyPasswordFromPgpass|ConsistentStandbyPassword|ConsistentStandbyPasswordInconsistent|EnsureLeaderStandbyPassword|EnsureLeaderStandbyPasswordNoop|EnsureLeaderStandbyPasswordRejectsNewline|ComponentPostgreSQLStandbyPasswordRepairTransformer|ComponentPostgreSQLStandbyPasswordRepairTransformerInconsistent)$' -race -count=1 --- controllers/apps/component_controller.go | 2 + ...nent_postgresql_standby_password_repair.go | 454 ++++++++++++++++++ ...postgresql_standby_password_repair_test.go | 380 +++++++++++++++ 3 files changed, 836 insertions(+) create mode 100644 controllers/apps/transformer_component_postgresql_standby_password_repair.go create mode 100644 controllers/apps/transformer_component_postgresql_standby_password_repair_test.go diff --git a/controllers/apps/component_controller.go b/controllers/apps/component_controller.go index 992a35ccbfa..672c433bf68 100644 --- a/controllers/apps/component_controller.go +++ b/controllers/apps/component_controller.go @@ -187,6 +187,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( &componentStatusTransformer{Client: r.Client}, // repair PostgreSQL Patroni dynamic config after failover &componentPatroniDCSRepairTransformer{}, + // repair PostgreSQL standby password drift after replication breaks + &componentPostgreSQLStandbyPasswordRepairTransformer{}, ).Build() // Execute stage diff --git a/controllers/apps/transformer_component_postgresql_standby_password_repair.go b/controllers/apps/transformer_component_postgresql_standby_password_repair.go new file mode 100644 index 00000000000..5d887fb43e6 --- /dev/null +++ b/controllers/apps/transformer_component_postgresql_standby_password_repair.go @@ -0,0 +1,454 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + ctrl "sigs.k8s.io/controller-runtime" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" +) + +const ( + standbyPasswordRepairConditionType = "StandbyPasswordRepair" + + standbyPasswordRepairReasonSucceeded = "Succeeded" + standbyPasswordRepairReasonFailed = "Failed" + standbyPasswordRepairReasonInconsistent = "StandbyPasswordInconsistent" + + standbyPasswordRepairRequeueInterval = time.Minute + standbyPgpassPath = "/run/postgresql/pgpass" + standbyUserName = "standby" +) + +const ensureStandbyPasswordScript = ` +set -eu +password="$(cat)" +if [ -z "$password" ]; then + echo "empty standby password" >&2 + exit 1 +fi +escaped_password="$(printf "%s" "$password" | sed "s/'/''/g")" +matches="$(psql -U postgres -v ON_ERROR_STOP=1 -Atq </dev/null +SET standard_conforming_strings = on; +SET password_encryption = 'md5'; +ALTER USER standby PASSWORD '$escaped_password'; +SQL +fi +verified="$(psql -U postgres -v ON_ERROR_STOP=1 -Atq <&2 + exit 1 +fi +printf "%s\n" "$matches" +` + +// componentPostgreSQLStandbyPasswordRepairTransformer repairs drift between +// the standby password used by pods and the password stored in PostgreSQL. +type componentPostgreSQLStandbyPasswordRepairTransformer struct { + restConfig *rest.Config + execRunner podExecRunner +} + +var _ graph.Transformer = &componentPostgreSQLStandbyPasswordRepairTransformer{} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error { + transCtx, _ := ctx.(*componentTransformContext) + if model.IsObjectDeleting(transCtx.ComponentOrig) { + return nil + } + if !isPostgreSQLComponent(transCtx) { + return nil + } + if transCtx.Component.Status.Phase != appsv1alpha1.RunningClusterCompPhase { + return nil + } + + runningITS, ok := transCtx.RunningWorkload.(*workloads.InstanceSet) + if !ok || runningITS == nil || runningITS.Status.AvailableReplicas == 0 { + return nil + } + + pods, err := t.runningPods(transCtx) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if len(pods) == 0 { + err := fmt.Errorf("postgresql standby password repair: no running pods with pod ip found") + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + runner := t.execRunner + if runner == nil { + runner = newKubePodExecRunner(t.restConfig) + } + expectedPassword, err := consistentStandbyPassword(transCtx.Context, runner, pods) + if err != nil { + if isInconsistentStandbyPasswordError(err) { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonInconsistent, err) + return nil + } + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if expectedPassword == "" { + err := fmt.Errorf("postgresql standby password repair: standby password not found in pgpass") + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + leaderPod, err := t.leaderPod(transCtx, runningITS) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + if leaderPod == nil || leaderPod.Status.PodIP == "" { + err := fmt.Errorf("postgresql standby password repair: leader pod %q has no pod ip", leaderMemberPodName(runningITS)) + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + repaired, err := ensureLeaderStandbyPassword(transCtx.Context, runner, leaderPod, expectedPassword) + if err != nil { + t.markRepairFailed(transCtx, standbyPasswordRepairReasonFailed, err) + return intctrlutil.NewDelayedRequeueError(standbyPasswordRepairRequeueInterval, err.Error()) + } + + t.markRepairSucceeded(transCtx, repaired) + if repaired && transCtx.EventRecorder != nil { + transCtx.EventRecorder.Event(transCtx.Component, corev1.EventTypeNormal, + standbyPasswordRepairConditionType, "repaired PostgreSQL standby password drift") + } + return nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) runningPods( + transCtx *componentTransformContext, +) ([]*corev1.Pod, error) { + labels := constant.GetComponentWellKnownLabels(transCtx.Cluster.Name, transCtx.SynthesizeComponent.Name) + pods, err := ctrlcomp.ListPodOwnedByComponent(transCtx.Context, transCtx.Client, transCtx.SynthesizeComponent.Namespace, labels, inDataContext4C()) + if err != nil { + return nil, fmt.Errorf("postgresql standby password repair: list component pods: %w", err) + } + + runningPods := make([]*corev1.Pod, 0, len(pods)) + for _, pod := range pods { + if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning || pod.Status.PodIP == "" { + continue + } + runningPods = append(runningPods, pod) + } + return runningPods, nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) leaderPod( + transCtx *componentTransformContext, + runningITS *workloads.InstanceSet, +) (*corev1.Pod, error) { + podName := leaderMemberPodName(runningITS) + if podName == "" { + return nil, fmt.Errorf("postgresql standby password repair: leader pod not found") + } + + pod := &corev1.Pod{} + podKey := types.NamespacedName{Namespace: transCtx.Component.Namespace, Name: podName} + if err := transCtx.Client.Get(transCtx.Context, podKey, pod, inDataContext4C()); err != nil { + return nil, fmt.Errorf("postgresql standby password repair: get leader pod %q: %w", podName, err) + } + return pod, nil +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) markRepairSucceeded( + transCtx *componentTransformContext, + repaired bool, +) { + message := "PostgreSQL standby password is up to date" + if repaired { + message = "PostgreSQL standby password was repaired" + } + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: standbyPasswordRepairConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: standbyPasswordRepairReasonSucceeded, + Message: message, + }) +} + +func (t *componentPostgreSQLStandbyPasswordRepairTransformer) markRepairFailed( + transCtx *componentTransformContext, + reason string, + err error, +) { + meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{ + Type: standbyPasswordRepairConditionType, + Status: metav1.ConditionFalse, + ObservedGeneration: transCtx.Component.Generation, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: err.Error(), + }) +} + +func consistentStandbyPassword(ctx context.Context, runner podExecRunner, pods []*corev1.Pod) (string, error) { + expectedPassword := "" + for _, pod := range pods { + stdout, stderr, err := runner.Exec(ctx, pod, []string{"cat", standbyPgpassPath}, "") + if err != nil { + return "", fmt.Errorf("postgresql standby password repair: read pgpass from pod %q: %w: %s", pod.Name, err, strings.TrimSpace(stderr)) + } + password, err := parseStandbyPasswordFromPgpass(stdout) + if err != nil { + return "", fmt.Errorf("postgresql standby password repair: parse pgpass from pod %q: %w", pod.Name, err) + } + if expectedPassword == "" { + expectedPassword = password + continue + } + if password != expectedPassword { + return "", inconsistentStandbyPasswordError{} + } + } + return expectedPassword, nil +} + +func ensureLeaderStandbyPassword( + ctx context.Context, + runner podExecRunner, + leaderPod *corev1.Pod, + expectedPassword string, +) (bool, error) { + if strings.ContainsAny(expectedPassword, "\r\n") { + return false, fmt.Errorf("postgresql standby password repair: invalid standby password") + } + stdout, _, err := runner.Exec(ctx, leaderPod, []string{"sh", "-c", ensureStandbyPasswordScript}, expectedPassword) + if err != nil { + return false, fmt.Errorf("postgresql standby password repair: ensure standby password on leader pod %q: %w", leaderPod.Name, err) + } + switch strings.TrimSpace(stdout) { + case "t": + return false, nil + case "f": + return true, nil + default: + return false, fmt.Errorf("postgresql standby password repair: unexpected verification result from leader pod %q", leaderPod.Name) + } +} + +func parseStandbyPasswordFromPgpass(content string) (string, error) { + for _, rawLine := range strings.Split(content, "\n") { + line := strings.TrimRight(rawLine, "\r") + trimmed := strings.TrimSpace(line) + if trimmed == "" || strings.HasPrefix(trimmed, "#") { + continue + } + fields := splitPgpassLine(line) + if len(fields) < 5 { + continue + } + if fields[3] == standbyUserName { + if fields[4] == "" { + return "", fmt.Errorf("standby password is empty") + } + return fields[4], nil + } + } + return "", fmt.Errorf("standby entry not found") +} + +func splitPgpassLine(line string) []string { + fields := make([]string, 0, 5) + var field strings.Builder + escaped := false + for _, r := range line { + switch { + case escaped: + field.WriteRune(r) + escaped = false + case r == '\\': + escaped = true + case r == ':': + fields = append(fields, field.String()) + field.Reset() + default: + field.WriteRune(r) + } + } + if escaped { + field.WriteRune('\\') + } + fields = append(fields, field.String()) + return fields +} + +type inconsistentStandbyPasswordError struct{} + +func (inconsistentStandbyPasswordError) Error() string { + return "postgresql standby password repair: standby passwords in pgpass differ across running pods" +} + +func isInconsistentStandbyPasswordError(err error) bool { + var target inconsistentStandbyPasswordError + return errors.As(err, &target) +} + +type podExecRunner interface { + Exec(ctx context.Context, pod *corev1.Pod, command []string, stdin string) (stdout string, stderr string, err error) +} + +type kubePodExecRunner struct { + restConfig *rest.Config +} + +func newKubePodExecRunner(restConfig *rest.Config) *kubePodExecRunner { + return &kubePodExecRunner{restConfig: restConfig} +} + +func (r *kubePodExecRunner) Exec( + ctx context.Context, + pod *corev1.Pod, + command []string, + stdin string, +) (string, string, error) { + if pod == nil { + return "", "", fmt.Errorf("pod is nil") + } + containerName := postgreSQLContainerName(pod) + if containerName == "" { + return "", "", fmt.Errorf("pod %q has no container", pod.Name) + } + + restConfig, err := r.config() + if err != nil { + return "", "", err + } + restClient, err := rest.RESTClientFor(restConfig) + if err != nil { + return "", "", fmt.Errorf("create kubernetes rest client: %w", err) + } + req := restClient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec") + req.VersionedParams(&corev1.PodExecOptions{ + Container: containerName, + Command: command, + Stdin: stdin != "", + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + executor, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL()) + if err != nil { + return "", "", fmt.Errorf("create kubernetes exec executor: %w", err) + } + var stdout, stderr bytes.Buffer + var stdinReader io.Reader + if stdin != "" { + stdinReader = strings.NewReader(stdin) + } + err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdinReader, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + return stdout.String(), stderr.String(), err +} + +func (r *kubePodExecRunner) config() (*rest.Config, error) { + var ( + config *rest.Config + err error + ) + if r.restConfig != nil { + config = rest.CopyConfig(r.restConfig) + } else { + config, err = ctrl.GetConfig() + if err != nil { + return nil, fmt.Errorf("get kubernetes rest config: %w", err) + } + } + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + config.APIPath = "/api" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + return config, nil +} + +func postgreSQLContainerName(pod *corev1.Pod) string { + preferredNames := map[string]struct{}{ + "postgresql": {}, + "postgres": {}, + "spilo": {}, + } + for _, container := range pod.Spec.Containers { + if _, ok := preferredNames[strings.ToLower(container.Name)]; ok { + return container.Name + } + } + for _, container := range pod.Spec.Containers { + name := strings.ToLower(container.Name) + if strings.Contains(name, "lorry") || strings.Contains(name, "config") { + continue + } + return container.Name + } + if len(pod.Spec.Containers) == 0 { + return "" + } + return pod.Spec.Containers[0].Name +} + +var _ podExecRunner = &kubePodExecRunner{} diff --git a/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go b/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go new file mode 100644 index 00000000000..7d6071abd25 --- /dev/null +++ b/controllers/apps/transformer_component_postgresql_standby_password_repair_test.go @@ -0,0 +1,380 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package apps + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/graph" + "github.com/apecloud/kubeblocks/pkg/controller/model" +) + +func TestParseStandbyPasswordFromPgpass(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + content string + want string + wantErr bool + }{ + { + name: "standby entry", + content: strings.Join([]string{ + "# comment", + "localhost:5432:*:postgres:root", + "leader:5432:*:standby:secret", + }, "\n"), + want: "secret", + }, + { + name: "escaped delimiters", + content: `leader\:0:5432:*:standby:se\:cret\\`, + want: `se:cret\`, + }, + { + name: "preserve password spaces", + content: "leader:5432:*:standby: secret ", + want: " secret ", + }, + { + name: "missing standby", + content: "localhost:5432:*:postgres:root", + wantErr: true, + }, + { + name: "empty standby password", + content: "localhost:5432:*:standby:", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseStandbyPasswordFromPgpass(tt.content) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.want, got) + }) + } +} + +func TestConsistentStandbyPassword(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + "postgresql-0": "localhost:5432:*:standby:secret", + "postgresql-1": "localhost:5432:*:standby:secret", + }, + } + + password, err := consistentStandbyPassword(context.Background(), runner, pods) + + require.NoError(t, err) + require.Equal(t, "secret", password) + require.Equal(t, [][]string{ + {"cat", standbyPgpassPath}, + {"cat", standbyPgpassPath}, + }, runner.commands) +} + +func TestConsistentStandbyPasswordInconsistent(t *testing.T) { + t.Parallel() + + pods := []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}}, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + "postgresql-0": "localhost:5432:*:standby:secret-a", + "postgresql-1": "localhost:5432:*:standby:secret-b", + }, + } + + _, err := consistentStandbyPassword(context.Background(), runner, pods) + + require.Error(t, err) + require.True(t, isInconsistentStandbyPasswordError(err)) +} + +func TestEnsureLeaderStandbyPassword(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "f"} + + repaired, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "secret") + + require.NoError(t, err) + require.True(t, repaired) + require.Equal(t, "secret", runner.stdin) + require.Equal(t, []string{"sh", "-c", ensureStandbyPasswordScript}, runner.lastCommand) +} + +func TestEnsureLeaderStandbyPasswordNoop(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "t"} + + repaired, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "secret") + + require.NoError(t, err) + require.False(t, repaired) +} + +func TestEnsureLeaderStandbyPasswordRejectsNewline(t *testing.T) { + t.Parallel() + + leaderPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "postgresql-1"}} + runner := &fakePodExecRunner{ensureResult: "t"} + + _, err := ensureLeaderStandbyPassword(context.Background(), runner, leaderPod, "sec\nret") + + require.Error(t, err) + require.Empty(t, runner.lastCommand) +} + +func TestComponentPostgreSQLStandbyPasswordRepairTransformer(t *testing.T) { + t.Parallel() + + const ( + namespace = "default" + clusterName = "test" + componentName = "postgresql" + compName = "test-postgresql" + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + cluster := &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: clusterName, Namespace: namespace}, + } + comp := &appsv1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{Name: compName, Namespace: namespace}, + Status: appsv1alpha1.ComponentStatus{ + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + compDef := &appsv1alpha1.ComponentDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "postgresql"}, + Spec: appsv1alpha1.ComponentDefinitionSpec{ + ServiceKind: "PostgreSQL", + }, + } + labels := constant.GetComponentWellKnownLabels(clusterName, componentName) + leaderPod := postgresqlTestPod(namespace, leaderPodName, labels) + replica := postgresqlTestPod(namespace, replicaPod, labels) + runningITS := &workloads.InstanceSet{ + Status: workloads.InstanceSetStatus{ + AvailableReplicas: 2, + MembersStatus: []workloads.MemberStatus{{ + PodName: leaderPodName, + ReplicaRole: &workloads.ReplicaRole{ + Name: "leader", + IsLeader: true, + }, + }}, + }, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(rscheme). + WithObjects(cluster, comp, compDef, leaderPod, replica). + Build() + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(k8sClient), + Logger: log.Log, + Cluster: cluster, + CompDef: compDef, + Component: comp.DeepCopy(), + ComponentOrig: comp.DeepCopy(), + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: namespace, + ClusterName: clusterName, + Name: componentName, + }, + RunningWorkload: runningITS, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + leaderPodName: "localhost:5432:*:standby:secret", + replicaPod: "localhost:5432:*:standby:secret", + }, + ensureResult: "f", + } + + transformer := &componentPostgreSQLStandbyPasswordRepairTransformer{execRunner: runner} + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.Equal(t, leaderPodName, runner.ensurePod) + require.Equal(t, "secret", runner.stdin) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionTrue, cond.Status) +} + +func TestComponentPostgreSQLStandbyPasswordRepairTransformerInconsistent(t *testing.T) { + t.Parallel() + + const ( + namespace = "default" + clusterName = "test" + componentName = "postgresql" + compName = "test-postgresql" + leaderPodName = "test-postgresql-1" + replicaPod = "test-postgresql-0" + ) + + cluster := &appsv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: clusterName, Namespace: namespace}, + } + comp := &appsv1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{Name: compName, Namespace: namespace}, + Status: appsv1alpha1.ComponentStatus{ + Phase: appsv1alpha1.RunningClusterCompPhase, + }, + } + compDef := &appsv1alpha1.ComponentDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "postgresql"}, + Spec: appsv1alpha1.ComponentDefinitionSpec{ + ServiceKind: "PostgreSQL", + }, + } + labels := constant.GetComponentWellKnownLabels(clusterName, componentName) + leaderPod := postgresqlTestPod(namespace, leaderPodName, labels) + replica := postgresqlTestPod(namespace, replicaPod, labels) + runningITS := &workloads.InstanceSet{ + Status: workloads.InstanceSetStatus{ + AvailableReplicas: 2, + MembersStatus: []workloads.MemberStatus{{ + PodName: leaderPodName, + ReplicaRole: &workloads.ReplicaRole{ + Name: "leader", + IsLeader: true, + }, + }}, + }, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(rscheme). + WithObjects(cluster, comp, compDef, leaderPod, replica). + Build() + transCtx := &componentTransformContext{ + Context: context.Background(), + Client: model.NewGraphClient(k8sClient), + Logger: log.Log, + Cluster: cluster, + CompDef: compDef, + Component: comp.DeepCopy(), + ComponentOrig: comp.DeepCopy(), + SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ + Namespace: namespace, + ClusterName: clusterName, + Name: componentName, + }, + RunningWorkload: runningITS, + } + runner := &fakePodExecRunner{ + pgpass: map[string]string{ + leaderPodName: "localhost:5432:*:standby:secret-a", + replicaPod: "localhost:5432:*:standby:secret-b", + }, + } + + transformer := &componentPostgreSQLStandbyPasswordRepairTransformer{execRunner: runner} + err := transformer.Transform(transCtx, graph.NewDAG()) + + require.NoError(t, err) + require.Empty(t, runner.ensurePod) + cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, standbyPasswordRepairConditionType) + require.NotNil(t, cond) + require.Equal(t, metav1.ConditionFalse, cond.Status) + require.Equal(t, standbyPasswordRepairReasonInconsistent, cond.Reason) +} + +func postgresqlTestPod(namespace, name string, labels map[string]string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "postgresql", + }}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "10.0.0.10", + }, + } +} + +type fakePodExecRunner struct { + pgpass map[string]string + ensureResult string + ensurePod string + stdin string + lastCommand []string + commands [][]string +} + +func (r *fakePodExecRunner) Exec( + _ context.Context, + pod *corev1.Pod, + command []string, + stdin string, +) (string, string, error) { + r.commands = append(r.commands, append([]string{}, command...)) + r.lastCommand = append([]string{}, command...) + if len(command) == 2 && command[0] == "cat" && command[1] == standbyPgpassPath { + return r.pgpass[pod.Name], "", nil + } + r.ensurePod = pod.Name + r.stdin = stdin + return r.ensureResult, "", nil +} + +var _ podExecRunner = &fakePodExecRunner{} From a2983d58928100cf3bdc6a898406e99d09fbdcdc Mon Sep 17 00:00:00 2001 From: im0x0ing Date: Wed, 27 May 2026 16:12:24 +0800 Subject: [PATCH 3/3] fix: align PostgreSQL standby repair with runtime config Handle PostgreSQL leaders that expose the standby credential through PGPASSWORD_STANDBY instead of a standby entry in /run/postgresql/pgpass, and skip automatic password repair for standby-cluster mode where the addon may use remote primary credentials. --- ...ransformer_component_patroni_dcs_repair.go | 21 +- ...ormer_component_patroni_dcs_repair_test.go | 32 +- ...nent_postgresql_standby_password_repair.go | 169 ++++++++-- ...postgresql_standby_password_repair_test.go | 315 +++++++++++++----- 4 files changed, 415 insertions(+), 122 deletions(-) diff --git a/controllers/apps/transformer_component_patroni_dcs_repair.go b/controllers/apps/transformer_component_patroni_dcs_repair.go index fde77f2fd10..3b27a312be6 100644 --- a/controllers/apps/transformer_component_patroni_dcs_repair.go +++ b/controllers/apps/transformer_component_patroni_dcs_repair.go @@ -105,6 +105,11 @@ func (t *componentPatroniDCSRepairTransformer) Transform(ctx graph.TransformCont return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) } + patroniURL, err := patroniRESTURL(leaderPod.Status.PodIP, patroniRESTPort(leaderPod)) + if err != nil { + t.markRepairFailed(transCtx, err) + return intctrlutil.NewDelayedRequeueError(patroniDCSRepairRequeueInterval, err.Error()) + } expectedPgHBA, err := t.expectedPgHBARules(transCtx) if err != nil { t.markRepairFailed(transCtx, err) @@ -118,7 +123,7 @@ func (t *componentPatroniDCSRepairTransformer) Transform(ctx graph.TransformCont repaired, err := repairPatroniPgHBA( transCtx.Context, patroniClient, - patroniRESTURL(leaderPod.Status.PodIP, patroniRESTPort(leaderPod)), + patroniURL, expectedPgHBA, previousPatroniDCSRepairFailed(transCtx.Component.Status.Conditions), ) @@ -146,7 +151,8 @@ func isPostgreSQLComponent(transCtx *componentTransformContext) bool { } } compDefName := strings.ToLower(transCtx.CompDef.Name) - return compDefName == constant.ServiceKindPostgreSQL || strings.HasPrefix(compDefName, constant.ServiceKindPostgreSQL+"-") + return compDefName == constant.ServiceKindPostgreSQL || + strings.HasPrefix(compDefName, constant.ServiceKindPostgreSQL+"-") } func (t *componentPatroniDCSRepairTransformer) getLeaderPod( @@ -178,7 +184,9 @@ func leaderMemberPodName(runningITS *workloads.InstanceSet) string { return "" } -func (t *componentPatroniDCSRepairTransformer) expectedPgHBARules(transCtx *componentTransformContext) ([]string, error) { +func (t *componentPatroniDCSRepairTransformer) expectedPgHBARules( + transCtx *componentTransformContext, +) ([]string, error) { for _, configSpec := range transCtx.SynthesizeComponent.ConfigTemplates { cmKey := types.NamespacedName{ Namespace: transCtx.Cluster.Namespace, @@ -360,8 +368,11 @@ func normalizePgHBARules(rules []string) []string { return normalized } -func patroniRESTURL(podIP string, port int32) string { - return "http://" + net.JoinHostPort(podIP, strconv.Itoa(int(port))) +func patroniRESTURL(podIP string, port int32) (string, error) { + if net.ParseIP(podIP) == nil { + return "", fmt.Errorf("postgresql patroni dcs repair: invalid patroni pod ip %q", podIP) + } + return "http://" + net.JoinHostPort(podIP, strconv.Itoa(int(port))), nil } func patroniRESTPort(pod *corev1.Pod) int32 { diff --git a/controllers/apps/transformer_component_patroni_dcs_repair_test.go b/controllers/apps/transformer_component_patroni_dcs_repair_test.go index 7e82ba5cb22..84acfe6760b 100644 --- a/controllers/apps/transformer_component_patroni_dcs_repair_test.go +++ b/controllers/apps/transformer_component_patroni_dcs_repair_test.go @@ -164,6 +164,21 @@ func TestEnsurePgHBARemoteRules(t *testing.T) { require.Contains(t, rules, "host replication all 127.0.0.1/32 md5") } +func TestPatroniRESTURL(t *testing.T) { + t.Parallel() + + url, err := patroniRESTURL("10.0.0.10", 8008) + require.NoError(t, err) + require.Equal(t, "http://10.0.0.10:8008", url) + + url, err = patroniRESTURL("2001:db8::10", 8008) + require.NoError(t, err) + require.Equal(t, "http://[2001:db8::10]:8008", url) + + _, err = patroniRESTURL("not-an-ip", 8008) + require.Error(t, err) +} + func TestHTTPPatroniConfigClient(t *testing.T) { t.Parallel() @@ -326,7 +341,10 @@ func TestComponentPatroniDCSRepairTransformer(t *testing.T) { require.True(t, patroniClient.patched) require.True(t, patroniClient.reloaded) require.Equal(t, "http://10.0.0.10:8010", patroniClient.baseURL) - require.Empty(t, missingPgHBARules(patroniClient.config.PostgreSQL.PgHBA, parsePgHBAContent(configMap.Data[pgHBAConfigFile]))) + require.Empty(t, missingPgHBARules( + patroniClient.config.PostgreSQL.PgHBA, + parsePgHBAContent(configMap.Data[pgHBAConfigFile]), + )) cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, patroniDCSRepairConditionType) require.NotNil(t, cond) require.Equal(t, metav1.ConditionTrue, cond.Status) @@ -342,10 +360,14 @@ func TestComponentPatroniDCSRepairTransformerFallbackPgHBA(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, }, SynthesizeComponent: &ctrlcomp.SynthesizedComponent{ - Namespace: "default", - ClusterName: "test", - Name: "postgresql", - ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{Name: "postgresql-configuration"}}}, + Namespace: "default", + ClusterName: "test", + Name: "postgresql", + ConfigTemplates: []appsv1alpha1.ComponentConfigSpec{{ + ComponentTemplateSpec: appsv1alpha1.ComponentTemplateSpec{ + Name: "postgresql-configuration", + }, + }}, }, } diff --git a/controllers/apps/transformer_component_postgresql_standby_password_repair.go b/controllers/apps/transformer_component_postgresql_standby_password_repair.go index 5d887fb43e6..995c7d624cd 100644 --- a/controllers/apps/transformer_component_postgresql_standby_password_repair.go +++ b/controllers/apps/transformer_component_postgresql_standby_password_repair.go @@ -53,10 +53,13 @@ const ( standbyPasswordRepairReasonSucceeded = "Succeeded" standbyPasswordRepairReasonFailed = "Failed" standbyPasswordRepairReasonInconsistent = "StandbyPasswordInconsistent" + standbyPasswordRepairReasonSkipped = "Skipped" standbyPasswordRepairRequeueInterval = time.Minute standbyPgpassPath = "/run/postgresql/pgpass" standbyUserName = "standby" + readStandbyPasswordEnvCommand = `printf "%s" "${PGPASSWORD_STANDBY:-}"` + readPostgreSQLModeEnvCommand = `printf "%s" "${PG_MODE:-}"` ) const ensureStandbyPasswordScript = ` @@ -69,7 +72,11 @@ fi escaped_password="$(printf "%s" "$password" | sed "s/'/''/g")" matches="$(psql -U postgres -v ON_ERROR_STOP=1 -Atq <