Skip to content

Commit f252432

Browse files
committed
wip
1 parent 4cea22d commit f252432

File tree

9 files changed

+102
-135
lines changed

9 files changed

+102
-135
lines changed

internal/controller/postgrescluster/instance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,11 @@ func newObservedInstances(
262262
return &observed
263263
}
264264

265-
// writablePod looks at observedInstances and finds an instance that matches
265+
// WritablePod looks at observedInstances and finds an instance that matches
266266
// a few conditions. The instance should be non-terminating, running, and
267267
// writable i.e. the instance with the primary. If such an instance exists, it
268268
// is returned along with the instance pod.
269-
func (observed *observedInstances) writablePod(container string) (*corev1.Pod, *Instance) {
269+
func (observed *observedInstances) WritablePod(container string) (*corev1.Pod, *Instance) {
270270
if observed == nil {
271271
return nil, nil
272272
}

internal/controller/postgrescluster/instance_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ func TestWritablePod(t *testing.T) {
381381
t.Run("empty observed", func(t *testing.T) {
382382
observed := &observedInstances{}
383383

384-
pod, instance := observed.writablePod("container")
384+
pod, instance := observed.WritablePod("container")
385385
assert.Assert(t, pod == nil)
386386
assert.Assert(t, instance == nil)
387387
})
@@ -415,7 +415,7 @@ func TestWritablePod(t *testing.T) {
415415
terminating, known := observed.forCluster[0].IsTerminating()
416416
assert.Assert(t, terminating && known)
417417

418-
pod, instance := observed.writablePod("container")
418+
pod, instance := observed.WritablePod("container")
419419
assert.Assert(t, pod == nil)
420420
assert.Assert(t, instance == nil)
421421
})
@@ -447,7 +447,7 @@ func TestWritablePod(t *testing.T) {
447447
running, known := observed.forCluster[0].IsRunning(container)
448448
assert.Check(t, !running && known)
449449

450-
pod, instance := observed.writablePod("container")
450+
pod, instance := observed.WritablePod("container")
451451
assert.Assert(t, pod == nil)
452452
assert.Assert(t, instance == nil)
453453
})
@@ -480,7 +480,7 @@ func TestWritablePod(t *testing.T) {
480480
writable, known := observed.forCluster[0].IsWritable()
481481
assert.Check(t, !writable && known)
482482

483-
pod, instance := observed.writablePod("container")
483+
pod, instance := observed.WritablePod("container")
484484
assert.Assert(t, pod == nil)
485485
assert.Assert(t, instance == nil)
486486
})
@@ -517,7 +517,7 @@ func TestWritablePod(t *testing.T) {
517517
running, known := observed.forCluster[0].IsRunning(container)
518518
assert.Check(t, running && known)
519519

520-
pod, instance := observed.writablePod("container")
520+
pod, instance := observed.WritablePod("container")
521521
assert.Assert(t, pod != nil)
522522
assert.Assert(t, instance != nil)
523523
})

internal/controller/postgrescluster/pgbouncer.go

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ func (r *Reconciler) reconcilePGBouncer(
5555
err = r.reconcilePGBouncerInPostgreSQL(ctx, cluster, instances, secret)
5656
}
5757
if err == nil {
58-
// Trigger RECONNECT if primary has changed to force new server connections.
58+
// Send SIGTERM to PgBouncer if primary has changed, triggering graceful
59+
// shutdown and container restart. New process will do fresh DNS lookup.
5960
// This prevents stale connections from routing traffic to a demoted replica.
6061
err = r.reconcilePGBouncerReconnect(ctx, cluster, instances)
6162
}
@@ -116,18 +117,9 @@ func (r *Reconciler) reconcilePGBouncerInPostgreSQL(
116117
) error {
117118
log := logging.FromContext(ctx)
118119

119-
var pod *corev1.Pod
120-
121120
// Find the PostgreSQL instance that can execute SQL that writes to every
122121
// database. When there is none, return early.
123-
124-
for _, instance := range instances.forCluster {
125-
writable, known := instance.IsWritable()
126-
if writable && known && len(instance.Pods) > 0 {
127-
pod = instance.Pods[0]
128-
break
129-
}
130-
}
122+
pod, _ := instances.WritablePod(naming.ContainerDatabase)
131123
if pod == nil {
132124
return nil
133125
}
@@ -590,15 +582,32 @@ func (r *Reconciler) reconcilePGBouncerPodDisruptionBudget(
590582
return err
591583
}
592584

593-
// reconcilePGBouncerReconnect triggers a RECONNECT command on all PgBouncer
594-
// pods when the primary has changed. This forces PgBouncer to establish new
585+
// pgbouncerPods returns a list of PgBouncer pods for the given cluster.
586+
func (r *Reconciler) pgbouncerPods(ctx context.Context, cluster *v1beta1.PostgresCluster) (*corev1.PodList, error) {
587+
pgbouncerPods := &corev1.PodList{}
588+
selector, err := naming.AsSelector(naming.ClusterPGBouncerSelector(cluster))
589+
if err != nil {
590+
return nil, errors.WithStack(err)
591+
}
592+
593+
if err := r.Client.List(ctx, pgbouncerPods,
594+
client.InNamespace(cluster.Namespace),
595+
client.MatchingLabelsSelector{Selector: selector}); err != nil {
596+
return nil, errors.WithStack(err)
597+
}
598+
return pgbouncerPods, nil
599+
}
600+
601+
// reconcilePGBouncerReconnect is a sub-reconciler that signals PgBouncer pods
602+
// when the primary has changed. This forces PgBouncer to establish new
595603
// server connections to the correct primary, preventing stale connections
596604
// from routing traffic to a demoted replica after failover.
597605
//
598606
// Note: RECONNECT closes server connections when they are "released" according
599607
// to the pool mode. In transaction mode, this happens after each transaction.
600608
// In session mode, this happens when the client disconnects - so persistent
601609
// clients may continue hitting the old primary until they reconnect.
610+
// It returns error for integration with the parent reconciler's error handling chain.
602611
func (r *Reconciler) reconcilePGBouncerReconnect(
603612
ctx context.Context, cluster *v1beta1.PostgresCluster,
604613
instances *observedInstances,
@@ -610,79 +619,69 @@ func (r *Reconciler) reconcilePGBouncerReconnect(
610619
return nil
611620
}
612621

613-
var primaryPod *corev1.Pod
614-
for _, instance := range instances.forCluster {
615-
// Same condition as writablePod fn
616-
if writable, known := instance.IsWritable(); writable && known && len(instance.Pods) > 0 {
617-
primaryPod = instance.Pods[0]
618-
break
619-
}
620-
}
621-
622+
primaryPod, _ := instances.WritablePod(naming.ContainerDatabase)
622623
if primaryPod == nil {
623624
// We will retry later.
624-
log.V(1).Info("No writable instance found, skipping PgBouncer RECONNECT")
625+
log.V(1).Info("No writable instance found, skipping PgBouncer failover signal")
625626
return nil
626627
}
627628

628629
currentPrimaryUID := string(primaryPod.UID)
629-
lastReconnectUID := cluster.Status.Proxy.PGBouncer.LastReconnectPrimaryUID
630+
lastFailoverUID := cluster.Status.Proxy.PGBouncer.LastFailoverPrimaryUID
630631

631-
if currentPrimaryUID == lastReconnectUID {
632-
// Primary hasn't changed, no need to Reconnect.
632+
if currentPrimaryUID == lastFailoverUID {
633+
// Primary hasn't changed, no need to trigger failover.
633634
return nil
634635
}
635636

636-
log.Info("Primary changed, triggering PgBouncer RECONNECT",
637-
"previousPrimaryUID", lastReconnectUID,
637+
log.Info("Primary changed, triggering PgBouncer failover signal (SIGTERM)",
638+
"previousPrimaryUID", lastFailoverUID,
638639
"currentPrimaryUID", currentPrimaryUID,
639640
"currentPrimaryName", primaryPod.Name)
640641

641-
pgbouncerPods := &corev1.PodList{}
642-
selector, err := naming.AsSelector(naming.ClusterPGBouncerSelector(cluster))
642+
pgbouncerPods, err := r.pgbouncerPods(ctx, cluster)
643643
if err != nil {
644-
return errors.WithStack(err)
645-
}
646-
647-
if err := r.Client.List(ctx, pgbouncerPods,
648-
client.InNamespace(cluster.Namespace),
649-
client.MatchingLabelsSelector{Selector: selector}); err != nil {
650-
return errors.WithStack(err)
644+
return err
651645
}
652646

653-
// Send RECONNECT to each running PgBouncer pod
654-
var reconnectErr error
647+
// Send SIGTERM to each running PgBouncer pod to trigger graceful shutdown
648+
// and container restart. New PgBouncer process will do fresh DNS lookup.
649+
var failoverErrs []error
655650
successCount := 0
656651

657652
for i := range pgbouncerPods.Items {
658-
pod := &pgbouncerPods.Items[i]
653+
pod := pgbouncerPods.Items[i] // Copy value to avoid closure reference issues
659654
if pod.Status.Phase != corev1.PodRunning {
660655
continue
661656
}
662657

663-
exec := func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
658+
if err := pgbouncer.SignalFailover(ctx, func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
664659
return r.PodExec(ctx, pod.Namespace, pod.Name, naming.ContainerPGBouncer, stdin, stdout, stderr, command...)
665-
}
666-
667-
if err := pgbouncer.Reconnect(ctx, exec); err != nil {
668-
log.Error(err, "PgBouncer RECONNECT: failed to issue command to pod.", "pod", pod.Name)
669-
reconnectErr = err
660+
}); err != nil {
661+
log.Error(err, "PgBouncer failover signal: failed to send SIGTERM to pod", "pod", pod.Name)
662+
failoverErrs = append(failoverErrs, fmt.Errorf("pod %s: %w", pod.Name, err))
670663
} else {
671664
successCount++
672665
}
673666
}
674667

675-
// If we can't send a RECONNECT command to one of the pods, we won't update the LastReconnectPrimaryUID.
676-
// This means this will run again in the next reconciliation loop.
677-
if reconnectErr == nil {
678-
cluster.Status.Proxy.PGBouncer.LastReconnectPrimaryUID = currentPrimaryUID
668+
// Update status only if all pods were successfully signaled.
669+
// Partial failures will be retried in the next reconciliation loop.
670+
if len(failoverErrs) == 0 {
671+
cluster.Status.Proxy.PGBouncer.LastFailoverPrimaryUID = currentPrimaryUID
679672
}
680673

681-
log.Info("PgBouncer RECONNECT: done",
682-
"failed", reconnectErr != nil,
674+
log.Info("PgBouncer failover signal: done",
675+
"failed", len(failoverErrs) > 0,
683676
"successCount", successCount,
677+
"failureCount", len(failoverErrs),
684678
"totalPods", len(pgbouncerPods.Items),
685679
)
686680

687-
return reconnectErr
681+
// Return aggregated errors if any pods failed
682+
if len(failoverErrs) > 0 {
683+
return fmt.Errorf("failed to signal %d of %d pgbouncer pods: %w",
684+
len(failoverErrs), len(pgbouncerPods.Items), failoverErrs[0])
685+
}
686+
return nil
688687
}

internal/controller/postgrescluster/pgmonitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (r *Reconciler) reconcilePGMonitorExporter(ctx context.Context,
5858

5959
// Find the PostgreSQL instance that can execute SQL that writes to every
6060
// database. When there is none, return early.
61-
writablePod, writableInstance = instances.writablePod(naming.ContainerDatabase)
61+
writablePod, writableInstance = instances.WritablePod(naming.ContainerDatabase)
6262
if writableInstance == nil || writablePod == nil {
6363
return nil
6464
}

internal/controller/postgrescluster/postgres.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (r *Reconciler) reconcilePostgresDatabases(
204204

205205
// Find the PostgreSQL instance that can execute SQL that writes system
206206
// catalogs. When there is none, return early.
207-
pod, _ := instances.writablePod(container)
207+
pod, _ := instances.WritablePod(container)
208208
if pod == nil {
209209
return nil
210210
}
@@ -1047,7 +1047,7 @@ func (r *Reconciler) reconcileDatabaseInitSQL(ctx context.Context,
10471047
// Now that we have the data provided by the user. We can check for a
10481048
// writable pod and get the podExecutor for the pod's database container
10491049
var podExecutor postgres.Executor
1050-
pod, _ := instances.writablePod(naming.ContainerDatabase)
1050+
pod, _ := instances.WritablePod(naming.ContainerDatabase)
10511051
if pod == nil {
10521052
log.V(1).Info("Could not find a pod with a writable database container.")
10531053
return nil

internal/pgbouncer/config.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,6 @@ func clusterINI(cluster *v1beta1.PostgresCluster) string {
120120
"server_tls_sslmode": "verify-full",
121121
"server_tls_ca_file": certBackendAuthorityAbsolutePath,
122122

123-
// Enable Unix socket for admin console access. The special user
124-
// "pgbouncer" can connect without a password when using a Unix socket
125-
// from the same UID as the running process. This allows the operator
126-
// to send admin commands like RECONNECT after failover.
127-
// Ref.: https://www.pgbouncer.org/usage.html#admin-console
128-
"unix_socket_dir": "/tmp/pgbouncer",
129-
130-
// Allow the "pgbouncer" user to run admin commands (PAUSE, RESUME,
131-
// RECONNECT, etc.) on the admin console. Combined with unix_socket_dir,
132-
// this enables password-free admin access from within the container.
133-
"admin_users": "pgbouncer",
134123
}
135124

136125
// Override the above with any specified settings.

internal/pgbouncer/reconnect.go

Lines changed: 0 additions & 57 deletions
This file was deleted.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package pgbouncer
2+
3+
import (
4+
"context"
5+
6+
"github.com/percona/percona-postgresql-operator/internal/logging"
7+
"github.com/percona/percona-postgresql-operator/internal/postgres"
8+
)
9+
10+
// SignalFailover sends a SIGTERM to PgBouncer to trigger SHUTDOWN WAIT_FOR_CLIENTS [1]
11+
// mode, waiting for clients to gracefully disconnect [2]. This approach was
12+
// suggested by a PgBouncer maintainer [3] to deal with failovers in Kubernetes.
13+
//
14+
// What happens:
15+
// 1. Operator sends SIGTERM [2] to PgBouncer process (PID 1 in container)
16+
// 2. PgBouncer enters SHUTDOWN WAIT_FOR_CLIENTS mode [1].
17+
// 3. After Kubernetes grace period (default 30s), SIGKILL is sent if process still hasn't exited
18+
// 4. Container is terminated and restarted by Kubernetes Deployment controller.
19+
// 5. New PgBouncer process does fresh DNS lookup → connects to current primary.
20+
//
21+
// This approach is more effective than RECONNECT command for session mode with persistent
22+
// clients (MPG clusters) because RECONNECT waits for clients to disconnect, which never happens
23+
// for persistent clients. SIGTERM will guarantee termination and restarts after a grace period.
24+
//
25+
// [1] https://www.pgbouncer.org/usage.html#shutdown
26+
// [2] https://www.pgbouncer.org/usage.html#signals
27+
// [3] https://github.com/pgbouncer/pgbouncer/issues/1361
28+
func SignalFailover(ctx context.Context, exec postgres.Executor) error {
29+
log := logging.FromContext(ctx)
30+
log.Info("SignalFailover: sending SIGTERM to force container restart")
31+
32+
err := exec(ctx, nil, nil, nil, "kill", "-TERM", "1")
33+
34+
log.Info("SignalFailover: SIGTERM sent.", "failed", err != nil)
35+
return err
36+
}

pkg/apis/postgres-operator.crunchydata.com/v1beta1/pgbouncer_types.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ type PGBouncerPodStatus struct {
174174
// Total number of non-terminated pods.
175175
Replicas int32 `json:"replicas,omitempty"`
176176

177-
// Identifies the primary pod UID when RECONNECT was last triggered.
178-
// Used to detect failovers and force PgBouncer to establish new
179-
// server connections to the correct primary.
177+
// Identifies the primary pod UID when failover signal (SIGTERM) was last triggered.
178+
// Used to detect failovers and trigger PgBouncer container restart for fresh
179+
// connection pool and DNS lookup to the correct primary.
180180
// +optional
181-
LastReconnectPrimaryUID string `json:"lastReconnectPrimaryUID,omitempty"`
181+
LastFailoverPrimaryUID string `json:"lastFailoverPrimaryUID,omitempty"`
182182
}

0 commit comments

Comments
 (0)