Skip to content

Commit 4cea22d

Browse files
committed
wip
1 parent 308fe85 commit 4cea22d

File tree

4 files changed

+176
-2
lines changed

4 files changed

+176
-2
lines changed

internal/controller/postgrescluster/pgbouncer.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ func (r *Reconciler) reconcilePGBouncer(
5454
if err == nil {
5555
err = r.reconcilePGBouncerInPostgreSQL(ctx, cluster, instances, secret)
5656
}
57+
if err == nil {
58+
// Trigger RECONNECT if primary has changed to force new server connections.
59+
// This prevents stale connections from routing traffic to a demoted replica.
60+
err = r.reconcilePGBouncerReconnect(ctx, cluster, instances)
61+
}
5762
return err
5863
}
5964

@@ -584,3 +589,100 @@ func (r *Reconciler) reconcilePGBouncerPodDisruptionBudget(
584589
}
585590
return err
586591
}
592+
593+
// reconcilePGBouncerReconnect triggers a RECONNECT command on all PgBouncer
594+
// pods when the primary has changed. This forces PgBouncer to establish new
595+
// server connections to the correct primary, preventing stale connections
596+
// from routing traffic to a demoted replica after failover.
597+
//
598+
// Note: RECONNECT closes server connections when they are "released" according
599+
// to the pool mode. In transaction mode, this happens after each transaction.
600+
// In session mode, this happens when the client disconnects - so persistent
601+
// clients may continue hitting the old primary until they reconnect.
602+
func (r *Reconciler) reconcilePGBouncerReconnect(
603+
ctx context.Context, cluster *v1beta1.PostgresCluster,
604+
instances *observedInstances,
605+
) error {
606+
log := logging.FromContext(ctx)
607+
608+
// Skip if PgBouncer is disabled
609+
if cluster.Spec.Proxy == nil || cluster.Spec.Proxy.PGBouncer == nil {
610+
return nil
611+
}
612+
613+
var primaryPod *corev1.Pod
614+
for _, instance := range instances.forCluster {
615+
// Same condition as writablePod fn
616+
if writable, known := instance.IsWritable(); writable && known && len(instance.Pods) > 0 {
617+
primaryPod = instance.Pods[0]
618+
break
619+
}
620+
}
621+
622+
if primaryPod == nil {
623+
// We will retry later.
624+
log.V(1).Info("No writable instance found, skipping PgBouncer RECONNECT")
625+
return nil
626+
}
627+
628+
currentPrimaryUID := string(primaryPod.UID)
629+
lastReconnectUID := cluster.Status.Proxy.PGBouncer.LastReconnectPrimaryUID
630+
631+
if currentPrimaryUID == lastReconnectUID {
632+
// Primary hasn't changed, no need to Reconnect.
633+
return nil
634+
}
635+
636+
log.Info("Primary changed, triggering PgBouncer RECONNECT",
637+
"previousPrimaryUID", lastReconnectUID,
638+
"currentPrimaryUID", currentPrimaryUID,
639+
"currentPrimaryName", primaryPod.Name)
640+
641+
pgbouncerPods := &corev1.PodList{}
642+
selector, err := naming.AsSelector(naming.ClusterPGBouncerSelector(cluster))
643+
if err != nil {
644+
return errors.WithStack(err)
645+
}
646+
647+
if err := r.Client.List(ctx, pgbouncerPods,
648+
client.InNamespace(cluster.Namespace),
649+
client.MatchingLabelsSelector{Selector: selector}); err != nil {
650+
return errors.WithStack(err)
651+
}
652+
653+
// Send RECONNECT to each running PgBouncer pod
654+
var reconnectErr error
655+
successCount := 0
656+
657+
for i := range pgbouncerPods.Items {
658+
pod := &pgbouncerPods.Items[i]
659+
if pod.Status.Phase != corev1.PodRunning {
660+
continue
661+
}
662+
663+
exec := func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
664+
return r.PodExec(ctx, pod.Namespace, pod.Name, naming.ContainerPGBouncer, stdin, stdout, stderr, command...)
665+
}
666+
667+
if err := pgbouncer.Reconnect(ctx, exec); err != nil {
668+
log.Error(err, "PgBouncer RECONNECT: failed to issue command to pod.", "pod", pod.Name)
669+
reconnectErr = err
670+
} else {
671+
successCount++
672+
}
673+
}
674+
675+
// If we can't send a RECONNECT command to one of the pods, we won't update the LastReconnectPrimaryUID.
676+
// This means this will run again in the next reconciliation loop.
677+
if reconnectErr == nil {
678+
cluster.Status.Proxy.PGBouncer.LastReconnectPrimaryUID = currentPrimaryUID
679+
}
680+
681+
log.Info("PgBouncer RECONNECT: done",
682+
"failed", reconnectErr != nil,
683+
"successCount", successCount,
684+
"totalPods", len(pgbouncerPods.Items),
685+
)
686+
687+
return reconnectErr
688+
}

internal/pgbouncer/config.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,17 @@ func clusterINI(cluster *v1beta1.PostgresCluster) string {
120120
"server_tls_sslmode": "verify-full",
121121
"server_tls_ca_file": certBackendAuthorityAbsolutePath,
122122

123-
// Disable Unix sockets to keep the filesystem read-only.
124-
"unix_socket_dir": "",
123+
// Enable Unix socket for admin console access. The special user
124+
// "pgbouncer" can connect without a password when using a Unix socket
125+
// from the same UID as the running process. This allows the operator
126+
// to send admin commands like RECONNECT after failover.
127+
// Ref.: https://www.pgbouncer.org/usage.html#admin-console
128+
"unix_socket_dir": "/tmp/pgbouncer",
129+
130+
// Allow the "pgbouncer" user to run admin commands (PAUSE, RESUME,
131+
// RECONNECT, etc.) on the admin console. Combined with unix_socket_dir,
132+
// this enables password-free admin access from within the container.
133+
"admin_users": "pgbouncer",
125134
}
126135

127136
// Override the above with any specified settings.

internal/pgbouncer/reconnect.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package pgbouncer
2+
3+
import (
4+
"bytes"
5+
"context"
6+
7+
"github.com/percona/percona-postgresql-operator/internal/logging"
8+
"github.com/percona/percona-postgresql-operator/internal/postgres"
9+
)
10+
11+
// Reconnect sends the RECONNECT command to PgBouncer's admin console.
12+
// This closes all server connections at the next opportunity, forcing
13+
// PgBouncer to establish new connections to the current primary.
14+
//
15+
// From PgBouncer docs: "Close each open server connection for the given
16+
// database, or all databases, at the next opportunity."
17+
//
18+
// This is non-disruptive: PgBouncer waits for the connection to be
19+
// "released" before closing it. In transaction pooling mode, this means
20+
// waiting for the current transaction to complete. In session pooling
21+
// mode, this means waiting for the client to disconnect.
22+
//
23+
// The command connects via Unix socket as the special "pgbouncer" user,
24+
// which is allowed without password when the client has the same UID
25+
// as the running PgBouncer process.
26+
//
27+
// Ref.: https://www.pgbouncer.org/usage.html#admin-console
28+
func Reconnect(ctx context.Context, exec postgres.Executor) error {
29+
log := logging.FromContext(ctx)
30+
log.Info("Triggering PgBouncer RECONNECT to force new server connections")
31+
32+
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
33+
34+
// Connect to PgBouncer admin console via Unix socket.
35+
// The "pgbouncer" user can connect without password from same UID.
36+
// The "pgbouncer" database is the virtual admin console.
37+
// Ref.:
38+
err := exec(ctx, nil, stdout, stderr,
39+
"psql",
40+
"-h", "/tmp/pgbouncer",
41+
"-p", "6432",
42+
"-U", "pgbouncer",
43+
"-d", "pgbouncer",
44+
"-c", "RECONNECT",
45+
)
46+
47+
if err != nil {
48+
log.Error(err, "RECONNECT failed",
49+
"stdout", stdout.String(),
50+
"stderr", stderr.String())
51+
} else {
52+
log.V(1).Info("RECONNECT succeeded",
53+
"stdout", stdout.String())
54+
}
55+
56+
return err
57+
}

pkg/apis/postgres-operator.crunchydata.com/v1beta1/pgbouncer_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,10 @@ type PGBouncerPodStatus struct {
173173

174174
// Total number of non-terminated pods.
175175
Replicas int32 `json:"replicas,omitempty"`
176+
177+
// Identifies the primary pod UID when RECONNECT was last triggered.
178+
// Used to detect failovers and force PgBouncer to establish new
179+
// server connections to the correct primary.
180+
// +optional
181+
LastReconnectPrimaryUID string `json:"lastReconnectPrimaryUID,omitempty"`
176182
}

0 commit comments

Comments
 (0)