diff --git a/internal/flypg/node.go b/internal/flypg/node.go index fe25232..332ae0a 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -218,7 +218,7 @@ func (n *Node) Init(ctx context.Context) error { return fmt.Errorf("failed to initialize fly config: %s", err) } - if err := n.PGConfig.initialize(store); err != nil { + if err := n.PGConfig.initialize(ctx, store); err != nil { return fmt.Errorf("failed to initialize pg config: %s", err) } @@ -527,7 +527,7 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro } // Set restore configuration - if err := n.PGConfig.initialize(store); err != nil { + if err := n.PGConfig.initialize(ctx, store); err != nil { return fmt.Errorf("failed to initialize pg config: %s", err) } diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index 94a294a..335de8c 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -112,7 +112,7 @@ func (c *PGConfig) Print(w io.Writer) error { return e.Encode(cfg) } -func (c *PGConfig) SetDefaults(store *state.Store) error { +func (c *PGConfig) SetDefaults(ctx context.Context, store *state.Store) error { // The default wal_segment_size in mb const walSegmentSize = 16 @@ -184,6 +184,42 @@ func (c *PGConfig) SetDefaults(store *state.Store) error { return fmt.Errorf("failed to set recovery target config: %s", err) } + // Evaluate pg_control settings to determine if any internal settings need to be updated + if err := c.setPGControlOverrides(ctx); err != nil { + return fmt.Errorf("failed to set pg_control overrides: %s", err) + } + + return nil +} + +func (c *PGConfig) setPGControlOverrides(ctx context.Context) error { + pgControlMap, err := pgControlSettings(ctx) + if err != nil { + return fmt.Errorf("failed to fetch pg_control settings: %s", err) + } + + if pgControlMap != nil { + for k, v := range pgControlMap { + // Skip any settings that are not already specified by the internal config + if _, ok := c.internalConfig[k]; !ok { + continue + } + + if k == "max_connections" { + maxConns, err := strconv.Atoi(v) + if err != nil { + log.Printf("[WARN] Failed to parse max_connections from pg_control: %v", err) + continue + } + if maxConns > c.internalConfig[k].(int) { + // If the max_connections from pg_control is greater than the current default, update it. + log.Printf("[WARN] pg_control specifies a max_connections value of %d, which is greater than our default of %d. Updating.", maxConns, c.internalConfig[k].(int)) + c.internalConfig[k] = maxConns + } + } + } + } + return nil } @@ -269,7 +305,7 @@ func (c *PGConfig) isInitialized() bool { // initialize will ensure the required configuration files are stubbed and the parent // postgresql.conf file includes them. -func (c *PGConfig) initialize(store *state.Store) error { +func (c *PGConfig) initialize(ctx context.Context, store *state.Store) error { if err := c.setDefaultHBA(); err != nil { return fmt.Errorf("failed updating pg_hba.conf: %s", err) } @@ -294,7 +330,7 @@ func (c *PGConfig) initialize(store *state.Store) error { } } - if err := c.SetDefaults(store); err != nil { + if err := c.SetDefaults(context.TODO(), store); err != nil { return fmt.Errorf("failed to set pg defaults: %s", err) } diff --git a/internal/flypg/pg_control.go b/internal/flypg/pg_control.go new file mode 100644 index 0000000..95e63ce --- /dev/null +++ b/internal/flypg/pg_control.go @@ -0,0 +1,62 @@ +package flypg + +import ( + "bufio" + "context" + "fmt" + "strings" + + "log" + + "github.com/fly-apps/postgres-flex/internal/utils" +) + +const ( + pathToPGControl = "/data/postgresql/global/pg_control" +) + +func pgControlSettings(ctx context.Context) (map[string]string, error) { + // Short-circuit if the pg_control file doesn't exist. + if !utils.FileExists(pathToPGControl) { + log.Println("[WARN] pg_control file does not exist. Skipping evaluation.") + return nil, nil + } + + result, err := utils.RunCmd(ctx, "root", "pg_controldata") + if err != nil { + return nil, fmt.Errorf("failed to run pg_controldata: %s", err) + } + + return parsePGControlData(string(result)) +} + +func parsePGControlData(pgControlData string) (map[string]string, error) { + settings := make(map[string]string) + + scanner := bufio.NewScanner(strings.NewReader(pgControlData)) + for scanner.Scan() { + line := scanner.Text() + + // Filter out lines that don't contain the word "setting". + if !strings.Contains(line, "setting:") { + continue + } + + parts := strings.SplitN(line, "setting:", 2) + if len(parts) != 2 { + continue + } + + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + + settings[key] = value + } + + // Check for any scanner errors. + if err := scanner.Err(); err != nil { + return nil, err + } + + return settings, nil +} diff --git a/internal/flypg/pg_control_test.go b/internal/flypg/pg_control_test.go new file mode 100644 index 0000000..e15285c --- /dev/null +++ b/internal/flypg/pg_control_test.go @@ -0,0 +1,60 @@ +package flypg + +import ( + "testing" +) + +func TestParseSettingsFromFile(t *testing.T) { + // Sample input that includes some lines with "setting:" and some without. + input := `pg_control version number: 1300 +Catalog version number: 202307071 +Database system identifier: 7420479024646529412 +Database cluster state: in archive recovery +pg_control last modified: Tue 04 Feb 2025 10:04:52 PM UTC +Latest checkpoint location: 2/40000060 +Latest checkpoint's REDO location: 2/40000028 +Latest checkpoint's REDO WAL file: 000000020000000200000040 +Latest checkpoint's TimeLineID: 2 +Latest checkpoint's PrevTimeLineID: 2 +Latest checkpoint's full_page_writes: on +Latest checkpoint's NextXID: 0:34 +wal_level setting: replica +wal_log_hints setting: on +max_connections setting: 500 +max_worker_processes setting: 8 +Some other line without the keyword +Blocks per segment of large relation: 131072 +WAL block size: 8192 +Bytes per WAL segment: 16777216 +Maximum length of identifiers: 64 +Maximum columns in an index: 32 +Maximum size of a TOAST chunk: 1996 +Size of a large-object chunk: 2048` + + settings, err := parsePGControlData(input) + if err != nil { + t.Fatalf("parsePGControlData returned an error: %v", err) + } + + // Define the expected key/value pairs. + expected := map[string]string{ + "wal_level": "replica", + "wal_log_hints": "on", + "max_connections": "500", + "max_worker_processes": "8", + } + + if len(settings) != len(expected) { + t.Errorf("expected %d settings, got %d", len(expected), len(settings)) + } + + // Verify that the expected key/value pairs are present in the settings map. + for key, want := range expected { + got, ok := settings[key] + if !ok { + t.Errorf("expected key %q not found in settings", key) + } else if got != want { + t.Errorf("for key %q, expected value %q, got %q", key, want, got) + } + } +} diff --git a/internal/flypg/pg_test.go b/internal/flypg/pg_test.go index a5efe7f..7656f39 100644 --- a/internal/flypg/pg_test.go +++ b/internal/flypg/pg_test.go @@ -1,6 +1,7 @@ package flypg import ( + "context" "fmt" "os" "strings" @@ -25,6 +26,8 @@ func TestPGConfigInitialization(t *testing.T) { } defer cleanup() + ctx := context.TODO() + pgConf := &PGConfig{ DataDir: pgTestDirectory, Port: 5433, @@ -41,7 +44,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Run("initialize", func(t *testing.T) { store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } }) @@ -98,7 +101,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("TIMESCALEDB_ENABLED", "true") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -129,7 +132,7 @@ func TestPGConfigInitialization(t *testing.T) { } t.Run("defaults", func(t *testing.T) { - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -160,7 +163,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Fatal(err) } - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -181,7 +184,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Fatal(err) } - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -202,7 +205,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Fatal(err) } - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -223,7 +226,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Fatal(err) } - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -242,7 +245,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -257,7 +260,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_CONFIG", "") - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -275,7 +278,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -293,7 +296,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetName=20240626T172443") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -311,7 +314,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?target=immediate") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -329,7 +332,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00Z&targetInclusive=false") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -351,7 +354,7 @@ func TestPGConfigInitialization(t *testing.T) { t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00&targetTimeline=2") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(ctx, store); err != nil { t.Fatal(err) } @@ -391,7 +394,7 @@ func TestPGUserConfigOverride(t *testing.T) { } store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(context.TODO(), store); err != nil { t.Error(err) } @@ -542,7 +545,7 @@ func TestValidateCompatibility(t *testing.T) { } store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { + if err := pgConf.initialize(context.TODO(), store); err != nil { t.Fatal(err) } t.Run("SharedPreloadLibraries", func(t *testing.T) {