Skip to content

Node name fix #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -36,7 +36,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -52,7 +52,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -68,7 +68,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand Down
9 changes: 8 additions & 1 deletion bin/restart-repmgrd
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
#!/bin/bash

kill `cat /tmp/repmgrd.pid`
if [ -f /tmp/repmgrd.pid ]; then
PID=$(cat /tmp/repmgrd.pid)

# Check if the process is running
if ps -p $PID > /dev/null 2>&1; then
kill $PID
fi
fi
49 changes: 40 additions & 9 deletions cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func main() {

func processUnregistration(ctx context.Context) error {
encodedArg := os.Args[1]
hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg)
machineBytes, err := base64.StdEncoding.DecodeString(encodedArg)
if err != nil {
return fmt.Errorf("failed to decode hostname: %v", err)
return fmt.Errorf("failed to decode machine: %v", err)
}

node, err := flypg.NewNode()
Expand All @@ -43,19 +43,50 @@ func processUnregistration(ctx context.Context) error {
}
defer func() { _ = conn.Close(ctx) }()

member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
machineID := string(machineBytes)

if len(machineID) != 14 {
return fmt.Errorf("invalid machine id: %s", machineID)
}

member, err := node.RepMgr.MemberByNodeName(ctx, conn, machineID)
if err != nil {
return fmt.Errorf("failed to resolve member: %s", err)
// If no rows are found, the member was either already unregistered or never registered
if err != pgx.ErrNoRows {
return fmt.Errorf("failed to resolve member using %s: %s", machineID, err)
}
}

// If the member exists unregister it and remove the replication slot
if member != nil {
if err := node.RepMgr.UnregisterMember(*member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
}

slotName := fmt.Sprintf("repmgr_slot_%d", member.ID)
if err := removeReplicationSlot(ctx, conn, slotName); err != nil {
return fmt.Errorf("failed to remove replication slot: %v", err)
}
}

if err := node.RepMgr.UnregisterMember(*member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
// Redirect logs to /dev/null temporarily so we don't pollute the response data.
devnull, err := os.Open(os.DevNull)
if err != nil {
return fmt.Errorf("failed to open /dev/null: %v", err)
}
defer func() { _ = devnull.Close() }()

// Save the original log output
originalLogOutput := log.Writer()

// Redirect logs to /dev/null
log.SetOutput(devnull)

slotName := fmt.Sprintf("repmgr_slot_%d", member.ID)
if err := removeReplicationSlot(ctx, conn, slotName); err != nil {
return err
if err := flypg.EvaluateClusterState(ctx, conn, node); err != nil {
return fmt.Errorf("failed to evaluate cluster state: %v", err)
}
// Restore the original log output
log.SetOutput(originalLogOutput)

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module github.com/fly-apps/postgres-flex

go 1.20
go 1.23

require (
github.com/go-chi/chi/v5 v5.0.8
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/consul/api v1.18.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.5.4
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/pkg/term v1.1.0
github.com/spf13/cobra v1.8.1
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/sync v0.1.0
Expand All @@ -36,8 +38,6 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXO
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc=
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag=
Expand All @@ -78,6 +79,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8=
github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -139,6 +141,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 h1:mZNiJSrmbQA/3+Vy8GLL/Q9qdHxPzjcxKv+E14GZLFs=
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2/go.mod h1:BbqpB4y6Z/cijQqKWJ3i8LMsAoC29gzX6vsSD3Qq7uw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -154,6 +157,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
Expand Down
90 changes: 76 additions & 14 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -88,7 +92,9 @@ func NewNode() (*Node, error) {
UserConfigPath: "/data/repmgr.user.conf",
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
HostName: node.Hostname(),
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -182,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error {
}
} else {
log.Println("Provisioning standby")
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
cloneTarget, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
if err != nil {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}
Expand Down Expand Up @@ -225,14 +231,6 @@ func (n *Node) Init(ctx context.Context) error {

// PostInit are operations that need to be executed against a running Postgres on boot.
func (n *Node) PostInit(ctx context.Context) error {
if ZombieLockExists() {
Copy link
Contributor Author

@davissp14 davissp14 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning, allow it to be re-screened as there are certain failure conditions that can be resolved automatically. If not, it will enter a crash loop until the machines max restarts are hit.

log.Println("[ERROR] Manual intervention required.")
log.Println("[ERROR] If a new primary has been established, consider adding a new replica with `fly machines clone <primary-machine-id>` and then remove this member.")
log.Println("[ERROR] Sleeping for 5 minutes.")
time.Sleep(5 * time.Minute)
return fmt.Errorf("unrecoverable zombie")
}

// Use the Postgres user on boot, since our internal user may not have been created yet.
conn, err := n.NewLocalConnection(ctx, "postgres", n.OperatorCredentials)
if err != nil {
Expand Down Expand Up @@ -265,7 +263,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve member role: %s", err)
}

// Restart repmgrd in the event the IP changes for an already registered node.
// Restart repmgrd in the event the machine ID changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)

Expand All @@ -291,14 +289,22 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
}

// This should never happen
if primary != n.PrivateIP {
// This should never happen, but check anyways for correctness
if primary != n.Hostname() {
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
primary,
n.PrivateIP,
n.Hostname(),
)
}

// Clear the zombie lock if it exists.
if ZombieLockExists() {
log.Println("[INFO] Clearing zombie lock and re-enabling read/write")
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}
}

// Re-register primary to apply any configuration changes.
if err := n.RepMgr.registerPrimary(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to re-register existing primary: %s", err)
Expand All @@ -311,6 +317,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down Expand Up @@ -399,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to enable repmgr: %s", err)
}

primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
primary, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
if err != nil {
return fmt.Errorf("failed to resolve primary member: %s", err)
}
Expand Down Expand Up @@ -527,3 +537,55 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}

// Hostname returns the hostname of the node.
func (n *Node) Hostname() string {
return fmt.Sprintf("%s.vm.%s.internal", n.MachineID, n.AppName)
}
4 changes: 2 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
Expand All @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
Expand Down
Loading
Loading