Skip to content

Parse the pgcontrol file on boot to ensure compatibility #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to initialize fly config: %s", err)
}

if err := n.PGConfig.initialize(store); err != nil {
if err := n.PGConfig.initialize(ctx, store); err != nil {
return fmt.Errorf("failed to initialize pg config: %s", err)
}

Expand Down Expand Up @@ -527,7 +527,7 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro
}

// Set restore configuration
if err := n.PGConfig.initialize(store); err != nil {
if err := n.PGConfig.initialize(ctx, store); err != nil {
return fmt.Errorf("failed to initialize pg config: %s", err)
}

Expand Down
42 changes: 39 additions & 3 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
return e.Encode(cfg)
}

func (c *PGConfig) SetDefaults(store *state.Store) error {
func (c *PGConfig) SetDefaults(ctx context.Context, store *state.Store) error {
// The default wal_segment_size in mb
const walSegmentSize = 16

Expand Down Expand Up @@ -184,6 +184,42 @@
return fmt.Errorf("failed to set recovery target config: %s", err)
}

// Evaluate pg_control settings to determine if any internal settings need to be updated
if err := c.setPGControlOverrides(ctx); err != nil {
return fmt.Errorf("failed to set pg_control overrides: %s", err)
}

return nil
}

func (c *PGConfig) setPGControlOverrides(ctx context.Context) error {
pgControlMap, err := pgControlSettings(ctx)
if err != nil {
return fmt.Errorf("failed to fetch pg_control settings: %s", err)
}

if pgControlMap != nil {

Check failure on line 201 in internal/flypg/pg.go

View workflow job for this annotation

GitHub Actions / Staticcheck

unnecessary nil check around range (S1031)
for k, v := range pgControlMap {
// Skip any settings that are not already specified by the internal config
if _, ok := c.internalConfig[k]; !ok {
continue
}

if k == "max_connections" {
maxConns, err := strconv.Atoi(v)
if err != nil {
log.Printf("[WARN] Failed to parse max_connections from pg_control: %v", err)
continue
}
if maxConns > c.internalConfig[k].(int) {
// If the max_connections from pg_control is greater than the current default, update it.
log.Printf("[WARN] pg_control specifies a max_connections value of %d, which is greater than our default of %d. Updating.", maxConns, c.internalConfig[k].(int))
c.internalConfig[k] = maxConns
}
}
}
}

return nil
}

Expand Down Expand Up @@ -269,7 +305,7 @@

// initialize will ensure the required configuration files are stubbed and the parent
// postgresql.conf file includes them.
func (c *PGConfig) initialize(store *state.Store) error {
func (c *PGConfig) initialize(ctx context.Context, store *state.Store) error {
if err := c.setDefaultHBA(); err != nil {
return fmt.Errorf("failed updating pg_hba.conf: %s", err)
}
Expand All @@ -294,7 +330,7 @@
}
}

if err := c.SetDefaults(store); err != nil {
if err := c.SetDefaults(context.TODO(), store); err != nil {
return fmt.Errorf("failed to set pg defaults: %s", err)
}

Expand Down
62 changes: 62 additions & 0 deletions internal/flypg/pg_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package flypg

import (
"bufio"
"context"
"fmt"
"strings"

"log"

"github.com/fly-apps/postgres-flex/internal/utils"
)

const (
pathToPGControl = "/data/postgresql/global/pg_control"
)

func pgControlSettings(ctx context.Context) (map[string]string, error) {
// Short-circuit if the pg_control file doesn't exist.
if !utils.FileExists(pathToPGControl) {
log.Println("[WARN] pg_control file does not exist. Skipping evaluation.")
return nil, nil
}

result, err := utils.RunCmd(ctx, "root", "pg_controldata")
if err != nil {
return nil, fmt.Errorf("failed to run pg_controldata: %s", err)
}

return parsePGControlData(string(result))
}

func parsePGControlData(pgControlData string) (map[string]string, error) {
settings := make(map[string]string)

scanner := bufio.NewScanner(strings.NewReader(pgControlData))
for scanner.Scan() {
line := scanner.Text()

// Filter out lines that don't contain the word "setting".
if !strings.Contains(line, "setting:") {
continue
}

parts := strings.SplitN(line, "setting:", 2)
if len(parts) != 2 {
continue
}

key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])

settings[key] = value
}

// Check for any scanner errors.
if err := scanner.Err(); err != nil {
return nil, err
}

return settings, nil
}
60 changes: 60 additions & 0 deletions internal/flypg/pg_control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package flypg

import (
"testing"
)

func TestParseSettingsFromFile(t *testing.T) {
// Sample input that includes some lines with "setting:" and some without.
input := `pg_control version number: 1300
Catalog version number: 202307071
Database system identifier: 7420479024646529412
Database cluster state: in archive recovery
pg_control last modified: Tue 04 Feb 2025 10:04:52 PM UTC
Latest checkpoint location: 2/40000060
Latest checkpoint's REDO location: 2/40000028
Latest checkpoint's REDO WAL file: 000000020000000200000040
Latest checkpoint's TimeLineID: 2
Latest checkpoint's PrevTimeLineID: 2
Latest checkpoint's full_page_writes: on
Latest checkpoint's NextXID: 0:34
wal_level setting: replica
wal_log_hints setting: on
max_connections setting: 500
max_worker_processes setting: 8
Some other line without the keyword
Blocks per segment of large relation: 131072
WAL block size: 8192
Bytes per WAL segment: 16777216
Maximum length of identifiers: 64
Maximum columns in an index: 32
Maximum size of a TOAST chunk: 1996
Size of a large-object chunk: 2048`

settings, err := parsePGControlData(input)
if err != nil {
t.Fatalf("parsePGControlData returned an error: %v", err)
}

// Define the expected key/value pairs.
expected := map[string]string{
"wal_level": "replica",
"wal_log_hints": "on",
"max_connections": "500",
"max_worker_processes": "8",
}

if len(settings) != len(expected) {
t.Errorf("expected %d settings, got %d", len(expected), len(settings))
}

// Verify that the expected key/value pairs are present in the settings map.
for key, want := range expected {
got, ok := settings[key]
if !ok {
t.Errorf("expected key %q not found in settings", key)
} else if got != want {
t.Errorf("for key %q, expected value %q, got %q", key, want, got)
}
}
}
35 changes: 19 additions & 16 deletions internal/flypg/pg_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flypg

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -25,6 +26,8 @@ func TestPGConfigInitialization(t *testing.T) {
}
defer cleanup()

ctx := context.TODO()

pgConf := &PGConfig{
DataDir: pgTestDirectory,
Port: 5433,
Expand All @@ -41,7 +44,7 @@ func TestPGConfigInitialization(t *testing.T) {

t.Run("initialize", func(t *testing.T) {
store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}
})
Expand Down Expand Up @@ -98,7 +101,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("TIMESCALEDB_ENABLED", "true")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -129,7 +132,7 @@ func TestPGConfigInitialization(t *testing.T) {
}

t.Run("defaults", func(t *testing.T) {
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -160,7 +163,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -181,7 +184,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -202,7 +205,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -223,7 +226,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -242,7 +245,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -257,7 +260,7 @@ func TestPGConfigInitialization(t *testing.T) {

t.Setenv("S3_ARCHIVE_CONFIG", "")

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -275,7 +278,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -293,7 +296,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetName=20240626T172443")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -311,7 +314,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?target=immediate")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -329,7 +332,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00Z&targetInclusive=false")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -351,7 +354,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00&targetTimeline=2")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -391,7 +394,7 @@ func TestPGUserConfigOverride(t *testing.T) {
}

store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(context.TODO(), store); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -542,7 +545,7 @@ func TestValidateCompatibility(t *testing.T) {
}

store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(context.TODO(), store); err != nil {
t.Fatal(err)
}
t.Run("SharedPreloadLibraries", func(t *testing.T) {
Expand Down
Loading