@@ -16,10 +16,12 @@ import (
16
16
"github.com/fly-apps/postgres-flex/internal/privnet"
17
17
"github.com/fly-apps/postgres-flex/internal/utils"
18
18
"github.com/jackc/pgx/v5"
19
+ "golang.org/x/exp/slices"
19
20
)
20
21
21
22
type Node struct {
22
23
AppName string
24
+ MachineID string
23
25
PrivateIP string
24
26
PrimaryRegion string
25
27
DataDir string
@@ -52,6 +54,8 @@ func NewNode() (*Node, error) {
52
54
53
55
node .PrivateIP = ipv6 .String ()
54
56
57
+ node .MachineID = os .Getenv ("FLY_MACHINE_ID" )
58
+
55
59
node .PrimaryRegion = os .Getenv ("PRIMARY_REGION" )
56
60
if node .PrimaryRegion == "" {
57
61
return nil , fmt .Errorf ("PRIMARY_REGION environment variable must be set" )
@@ -88,7 +92,9 @@ func NewNode() (*Node, error) {
88
92
UserConfigPath : "/data/repmgr.user.conf" ,
89
93
PasswordConfigPath : "/data/.pgpass" ,
90
94
DataDir : node .DataDir ,
95
+ HostName : node .Hostname (),
91
96
PrivateIP : node .PrivateIP ,
97
+ MachineID : node .MachineID ,
92
98
Port : 5433 ,
93
99
DatabaseName : "repmgr" ,
94
100
Credentials : node .ReplCredentials ,
@@ -182,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error {
182
188
}
183
189
} else {
184
190
log .Println ("Provisioning standby" )
185
- cloneTarget , err := n .RepMgr .ResolveMemberOverDNS (ctx )
191
+ cloneTarget , err := n .RepMgr .ResolvePrimaryOverDNS (ctx )
186
192
if err != nil {
187
193
return fmt .Errorf ("failed to resolve member over dns: %s" , err )
188
194
}
@@ -265,7 +271,7 @@ func (n *Node) PostInit(ctx context.Context) error {
265
271
return fmt .Errorf ("failed to resolve member role: %s" , err )
266
272
}
267
273
268
- // Restart repmgrd in the event the IP changes for an already registered node.
274
+ // Restart repmgrd in the event the machine ID changes for an already registered node.
269
275
// This can happen if the underlying volume is moved to a different node.
270
276
daemonRestartRequired := n .RepMgr .daemonRestartRequired (member )
271
277
@@ -311,6 +317,10 @@ func (n *Node) PostInit(ctx context.Context) error {
311
317
}
312
318
}
313
319
case StandbyRoleName :
320
+ if err := n .migrateNodeNameIfNeeded (ctx , repConn ); err != nil {
321
+ return fmt .Errorf ("failed to migrate node name: %s" , err )
322
+ }
323
+
314
324
// Register existing standby to apply any configuration changes.
315
325
if err := n .RepMgr .registerStandby (daemonRestartRequired ); err != nil {
316
326
return fmt .Errorf ("failed to register existing standby: %s" , err )
@@ -399,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error {
399
409
return fmt .Errorf ("failed to enable repmgr: %s" , err )
400
410
}
401
411
402
- primary , err := n .RepMgr .ResolveMemberOverDNS (ctx )
412
+ primary , err := n .RepMgr .ResolvePrimaryOverDNS (ctx )
403
413
if err != nil {
404
414
return fmt .Errorf ("failed to resolve primary member: %s" , err )
405
415
}
@@ -527,3 +537,55 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro
527
537
528
538
return nil
529
539
}
540
+
541
+ // migrate node name from 6pn to machine ID if needed
542
+ func (n * Node ) migrateNodeNameIfNeeded (ctx context.Context , repConn * pgx.Conn ) error {
543
+ primary , err := n .RepMgr .PrimaryMember (ctx , repConn )
544
+ if err != nil {
545
+ return fmt .Errorf ("failed to resolve primary member when updating standby: %s" , err )
546
+ }
547
+
548
+ primaryConn , err := n .RepMgr .NewRemoteConnection (ctx , primary .Hostname )
549
+ if err != nil {
550
+ return fmt .Errorf ("failed to establish connection to primary: %s" , err )
551
+ }
552
+ defer func () { _ = primaryConn .Close (ctx ) }()
553
+
554
+ rows , err := primaryConn .Query (ctx , "select application_name from pg_stat_replication" )
555
+ if err != nil {
556
+ return fmt .Errorf ("failed to query pg_stat_replication: %s" , err )
557
+ }
558
+ defer rows .Close ()
559
+
560
+ var applicationNames []string
561
+ for rows .Next () {
562
+ var applicationName string
563
+ if err := rows .Scan (& applicationName ); err != nil {
564
+ return fmt .Errorf ("failed to scan application_name: %s" , err )
565
+ }
566
+ applicationNames = append (applicationNames , applicationName )
567
+ }
568
+ if err := rows .Err (); err != nil {
569
+ return fmt .Errorf ("failed to iterate over rows: %s" , err )
570
+ }
571
+
572
+ // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
573
+ if slices .Contains (applicationNames , n .PrivateIP ) {
574
+ log .Printf ("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID..." )
575
+
576
+ if err := n .RepMgr .regenReplicationConf (ctx ); err != nil {
577
+ return fmt .Errorf ("failed to clone standby: %s" , err )
578
+ }
579
+
580
+ if err := admin .ReloadPostgresConfig (ctx , repConn ); err != nil {
581
+ return fmt .Errorf ("failed to reload postgresql: %s" , err )
582
+ }
583
+ }
584
+
585
+ return nil
586
+ }
587
+
588
+ // Hostname returns the hostname of the node.
589
+ func (n * Node ) Hostname () string {
590
+ return fmt .Sprintf ("%s.vm.%s.internal" , n .MachineID , n .AppName )
591
+ }
0 commit comments