Skip to content

Commit 19ddcdc

Browse files
committed
Create experimental ability to provision Postgres helper functions
Towards NSL-5018
1 parent 65a9992 commit 19ddcdc

File tree

4 files changed

+116
-24
lines changed

4 files changed

+116
-24
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
-- Collection of optional helper functions
2+
-- To provision these functions add
3+
-- provision_helper_functions: true
4+
-- to the database intent.
5+
6+
-- `CREATE TABLE IF NOT EXISTS` and `ALTER TABLE … ADD COLUMN IF NOT EXISTS`
7+
-- both require exclusive locks with Postgres, even if the table/column already exists.
8+
-- The functions below provide ensure semantics while only acquiring exclusive locks on mutations.
9+
10+
-- fn_ensure_table is a lock-friendly replacement for `CREATE TABLE IF NOT EXISTS`.
11+
--
12+
-- Example usage:
13+
--
14+
-- SELECT fn_ensure_table('testtable', $$
15+
-- UserID TEXT NOT NULL,
16+
-- PRIMARY KEY(UserID)
17+
-- $$);
18+
CREATE OR REPLACE FUNCTION fn_ensure_table(tname TEXT, def TEXT)
19+
RETURNS void
20+
LANGUAGE plpgsql AS
21+
$func$
22+
BEGIN
23+
IF NOT EXISTS (
24+
SELECT 1 FROM pg_tables
25+
WHERE schemaname = 'public' AND tablename = LOWER(tname)
26+
) THEN
27+
EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tname || ' (' || def || ');';
28+
END IF;
29+
END
30+
$func$;
31+
32+
-- fn_ensure_column is a lock-friendly replacement for `ALTER TABLE ... ADD COLUMN IF NOT EXISTS`.
33+
--
34+
-- Example usage:
35+
--
36+
-- SELECT fn_ensure_column('testtable', 'CreatedAt', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP');
37+
CREATE OR REPLACE FUNCTION fn_ensure_column(tname TEXT, cname TEXT, def TEXT)
38+
RETURNS void
39+
LANGUAGE plpgsql AS
40+
$func$
41+
BEGIN
42+
IF NOT EXISTS (
43+
SELECT 1 FROM information_schema.columns
44+
WHERE table_name = LOWER(tname) AND column_name = LOWER(cname)
45+
) THEN
46+
EXECUTE 'ALTER TABLE ' || tname || ' ADD COLUMN IF NOT EXISTS ' || cname || ' ' || def;
47+
END IF;
48+
END
49+
$func$;

library/oss/postgres/prepare/database/main.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package main
66

77
import (
88
"context"
9+
"embed"
910
"fmt"
11+
"io/fs"
1012
"log"
1113
"math/rand"
1214
"os"
@@ -27,32 +29,44 @@ const (
2729
connIdleTimeout = 15 * time.Minute
2830
connTimeout = 5 * time.Minute
2931

30-
caCertPath = "/tmp/ca.pem"
32+
caCertPath = "/tmp/ca.pem"
33+
helperFunctionsPath = "functions.sql"
34+
)
35+
36+
var (
37+
//go:embed *.sql
38+
data embed.FS
3139
)
3240

3341
func main() {
3442
ctx, p := provider.MustPrepare[*postgres.DatabaseIntent]()
3543

44+
if err := run(ctx, p); err != nil {
45+
log.Fatal(err)
46+
}
47+
}
48+
49+
func run(ctx context.Context, p *provider.Provider[*postgres.DatabaseIntent]) error {
3650
cluster := &postgresclass.ClusterInstance{}
3751
if err := p.Resources.Unmarshal(fmt.Sprintf("%s:cluster", providerPkg), cluster); err != nil {
38-
log.Fatalf("unable to read required resource \"cluster\": %v", err)
52+
return fmt.Errorf("unable to read required resource \"cluster\": %w", err)
3953
}
4054

4155
// TODO inject file as secret ref and propagate secret ref to server, too.
4256
if cluster.CaCert != "" {
4357
if err := os.WriteFile(caCertPath, []byte(cluster.CaCert), 0644); err != nil {
44-
log.Fatalf("failed to write %q: %v", caCertPath, err)
58+
return fmt.Errorf("failed to write %q: %w", caCertPath, err)
4559
}
4660

4761
if err := os.Setenv("PGSSLROOTCERT", caCertPath); err != nil {
48-
log.Fatalf("failed to set PGSSLROOTCERT: %v", err)
62+
return fmt.Errorf("failed to set PGSSLROOTCERT: %w", err)
4963
}
5064

5165
}
5266

5367
exists, err := ensureDatabase(ctx, cluster, p.Intent.Name)
5468
if err != nil {
55-
log.Fatalf("unable to create database %q: %v", p.Intent.Name, err)
69+
return fmt.Errorf("unable to create database %q: %w", p.Intent.Name, err)
5670
}
5771

5872
instance := &postgresclass.DatabaseInstance{
@@ -72,29 +86,45 @@ func main() {
7286
MaxConnIdleTime: connIdleTimeout,
7387
})
7488
if err != nil {
75-
log.Fatalf("unable to open connection: %v", err)
89+
return fmt.Errorf("unable to open connection: %w", err)
7690
}
7791
defer func() {
7892
if err := db.Close(); err != nil {
7993
log.Printf("unable to close database connection: %v", err)
8094
}
8195
}()
8296

97+
if p.Intent.ProvisionHelperFunctions {
98+
content, err := fs.ReadFile(data, helperFunctionsPath)
99+
if err != nil {
100+
return fmt.Errorf("failed to read %s: %w", helperFunctionsPath, err)
101+
}
102+
103+
if err := applyWithRetry(ctx, db, string(content)); err != nil {
104+
return fmt.Errorf("unable to apply helper functions: %w", err)
105+
}
106+
}
107+
83108
for _, schema := range p.Intent.Schema {
84-
if err := backoff.Retry(func() error {
85-
_, err := db.Exec(ctx, string(schema.Contents))
86-
return err
87-
}, backOff{
88-
interval: 100 * time.Millisecond,
89-
deadline: time.Now().Add(15 * time.Second),
90-
jitter: 100 * time.Millisecond,
91-
}); err != nil {
92-
log.Fatalf("unable to apply schema %q: %v", schema.Path, err)
109+
if err := applyWithRetry(ctx, db, string(schema.Contents)); err != nil {
110+
return fmt.Errorf("unable to apply schema %q: %w", schema.Path, err)
93111
}
94112
}
95113
}
96114

97115
p.EmitResult(instance)
116+
return nil
117+
}
118+
119+
func applyWithRetry(ctx context.Context, db *universepg.DB, sql string) error {
120+
return backoff.Retry(func() error {
121+
_, err := db.Exec(ctx, sql)
122+
return err
123+
}, backOff{
124+
interval: 100 * time.Millisecond,
125+
deadline: time.Now().Add(15 * time.Second),
126+
jitter: 100 * time.Millisecond,
127+
})
98128
}
99129

100130
func ensureDatabase(ctx context.Context, cluster *postgresclass.ClusterInstance, name string) (bool, error) {

library/oss/postgres/types.pb.go

Lines changed: 18 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

library/oss/postgres/types.proto

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ message ClusterIntent {
2020

2121
message DatabaseIntent {
2222
// The database name is applied as is (e.g. it is case-sensitive).
23-
string name = 1;
24-
repeated foundation.schema.FileContents schema = 2;
25-
bool skip_schema_initialization_if_exists = 3;
23+
string name = 1;
24+
repeated foundation.schema.FileContents schema = 2;
25+
bool skip_schema_initialization_if_exists = 3;
26+
bool provision_helper_functions = 4;
2627
}

0 commit comments

Comments
 (0)