Skip to content

Commit 3ce2a72

Browse files
committed
feat(persistence): allow setting optional schema for postgres and mysql database
Signed-off-by: Jonathan Pollert <jona.pollert@gmail.com>
1 parent 1c25b3c commit 3ce2a72

13 files changed

Lines changed: 177 additions & 19 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Description: You can now configure a specific postgres database schema for Argo Persistence in the controller config map.
2+
Authors: [Jonathan Pollert](https://github.com/jnt0r)
3+
Component: General
4+
Issues: 2452
5+
6+
Added support for custom database schemas to improve data isolation and security in shared environments. This allows Argo to operate within a designated logical schema rather than the default. Note: This feature is not applicable to MySQL, as it does not support logical schemas; for MySQL users, database names should continue to be used for application separation.

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ func (c DatabaseConfig) GetHostname() string {
389389
// PostgreSQLConfig contains PostgreSQL-specific database configuration
390390
type PostgreSQLConfig struct {
391391
DatabaseConfig
392+
// Schema is the name of the schema to use in the database. If not set, the default schema of the database will be used
393+
Schema string `json:"schema"`
392394
// SSL enables SSL connection to the database
393395
SSL bool `json:"ssl,omitempty"`
394396
// SSLMode specifies the SSL mode (disable, require, verify-ca, verify-full)

docs/workflow-archive.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,13 @@ Example:
9797

9898
persistence:
9999
archive: false
100+
101+
## Customizing PostgreSQL Database Schema
102+
103+
To change the schema for the tables in PostgreSQL, set `schema:` to the desired schema in the `persistence` section of [your configuration](workflow-controller-configmap.yaml).
104+
Only available on PostgreSQL.
105+
106+
Example:
107+
108+
persistence:
109+
schema: argo

docs/workflow-controller-configmap.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,19 @@ PostgreSQLConfig contains PostgreSQL-specific database configuration
197197

198198
### Fields
199199

200-
| Field Name | Field Type | Description |
201-
|------------------|-----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------|
202-
| `Host` | `string` | Host is the database server hostname |
203-
| `Port` | `int` | Port is the database server port |
204-
| `Database` | `string` | Database is the name of the database to connect to |
205-
| `TableName` | `string` | TableName is the name of the table to use, must be set |
206-
| `UsernameSecret` | [`apiv1.SecretKeySelector`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#secretkeyselector-v1-core) | UsernameSecret references a secret containing the database username |
207-
| `PasswordSecret` | [`apiv1.SecretKeySelector`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#secretkeyselector-v1-core) | PasswordSecret references a secret containing the database password |
208-
| `SSL` | `bool` | SSL enables SSL connection to the database |
209-
| `SSLMode` | `string` | SSLMode specifies the SSL mode (disable, require, verify-ca, verify-full) |
210-
| `AzureToken` | [`AzureTokenConfig`](#azuretokenconfig) | AzureToken specifies if the password should be fetched as an Azure token |
211-
| `AWSRDSToken` | [`AWSRDSTokenConfig`](#awsrdstokenconfig) | AWSRDSToken specifies if the password should be fetched as an AWS RDS IAM auth token |
200+
| Field Name | Field Type | Description |
201+
|------------------|-----------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------|
202+
| `Host` | `string` | Host is the database server hostname |
203+
| `Port` | `int` | Port is the database server port |
204+
| `Database` | `string` | Database is the name of the database to connect to |
205+
| `TableName` | `string` | TableName is the name of the table to use, must be set |
206+
| `UsernameSecret` | [`apiv1.SecretKeySelector`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#secretkeyselector-v1-core) | UsernameSecret references a secret containing the database username |
207+
| `PasswordSecret` | [`apiv1.SecretKeySelector`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#secretkeyselector-v1-core) | PasswordSecret references a secret containing the database password |
208+
| `Schema` | `string` | Schema is the name of the schema to use in the database. If not set, the default schema of the database will be used |
209+
| `SSL` | `bool` | SSL enables SSL connection to the database |
210+
| `SSLMode` | `string` | SSLMode specifies the SSL mode (disable, require, verify-ca, verify-full) |
211+
| `AzureToken` | [`AzureTokenConfig`](#azuretokenconfig) | AzureToken specifies if the password should be fetched as an Azure token |
212+
| `AWSRDSToken` | [`AWSRDSTokenConfig`](#awsrdstokenconfig) | AWSRDSToken specifies if the password should be fetched as an AWS RDS IAM auth token |
212213

213214
## AzureTokenConfig
214215

docs/workflow-controller-configmap.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ data:
292292
host: localhost
293293
port: 5432
294294
database: postgres
295+
schema: argo
295296
tableName: argo_workflows
296297
# the database secrets must be in the same namespace of the controller
297298
userNameSecret:
@@ -367,6 +368,7 @@ data:
367368
host: localhost
368369
port: 5432
369370
database: postgres # Can be the same database as persistence
371+
schema: argo
370372
# the database secrets must be in the same namespace as the controller
371373
userNameSecret:
372374
name: argo-postgres-config

hack/db/main.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
"net/url"
56
"os"
67
"strings"
78
"time"
@@ -24,10 +25,10 @@ import (
2425
var (
2526
session db.Session
2627
dbType sqldb.DBType
28+
dsn string
2729
)
2830

2931
func main() {
30-
var dsn string
3132
rootCmd := &cobra.Command{
3233
Use: "db",
3334
Short: "CLI for developers to use when working on the DB locally",
@@ -51,7 +52,11 @@ func NewMigrateCommand() *cobra.Command {
5152
Use: "migrate",
5253
Short: "Force DB migration for given cluster/table",
5354
RunE: func(cmd *cobra.Command, args []string) error {
54-
return persistsqldb.Migrate(cmd.Context(), session, cluster, table, dbType)
55+
schema, err := schemaFromDSN(dsn)
56+
if err != nil {
57+
return err
58+
}
59+
return persistsqldb.Migrate(cmd.Context(), session, cluster, schema, table, dbType)
5560
},
5661
}
5762
migrationCmd.Flags().StringVar(&cluster, "cluster", "default", "Cluster name")
@@ -120,6 +125,14 @@ func createDBSession(dsn string) (db.Session, sqldb.DBType, error) {
120125
}
121126
}
122127

128+
func schemaFromDSN(dsn string) (string, error) {
129+
u, err := url.Parse(dsn)
130+
if err != nil {
131+
return "", err
132+
}
133+
return strings.TrimSpace(u.Query().Get("search_path")), nil
134+
}
135+
123136
func randomPhase() wfv1.WorkflowPhase {
124137
phases := []wfv1.WorkflowPhase{
125138
wfv1.WorkflowSucceeded,

persist/sqldb/migrate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ const (
1212
versionTable = "schema_history"
1313
)
1414

15-
func Migrate(ctx context.Context, session db.Session, clusterName, tableName string, dbType sqldb.DBType) (err error) {
16-
return sqldb.Migrate(ctx, session, dbType, versionTable, []sqldb.Change{
15+
func Migrate(ctx context.Context, session db.Session, clusterName, schema string, tableName string, dbType sqldb.DBType) (err error) {
16+
return sqldb.Migrate(ctx, session, dbType, schema, versionTable, []sqldb.Change{
1717
sqldb.AnsiSQLChange(`create table if not exists ` + tableName + ` (
1818
id varchar(128) ,
1919
name varchar(256),

persist/sqldb/sqldb.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,11 @@ func GetTableName(persistConfig *config.PersistConfig) (string, error) {
1717
}
1818
return tableName, nil
1919
}
20+
21+
func GetSchema(persistConfig *config.PersistConfig) string {
22+
var schemaName string
23+
if persistConfig.PostgreSQL != nil {
24+
schemaName = persistConfig.PostgreSQL.Schema
25+
}
26+
return schemaName
27+
}

util/sqldb/migrate.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@ func ByType(dbType DBType, changes TypedChanges) Change {
2222
return nil
2323
}
2424

25-
func Migrate(ctx context.Context, session db.Session, dbType DBType, versionTableName string, changes []Change) error {
25+
func Migrate(ctx context.Context, session db.Session, dbType DBType, schema string, versionTableName string, changes []Change) error {
2626
ctx, logger := logging.RequireLoggerFromContext(ctx).WithField("dbType", dbType).InContext(ctx)
2727
logger.Info(ctx, "Migrating database schema")
2828

2929
{
30+
// Create the schema in the postgres database. MySQL does not have a concept of schema separate from database, so we skip this step for MySQL
31+
if dbType == Postgres && schema != "" {
32+
_, err := session.SQL().Exec(fmt.Sprintf("create schema if not exists %s", schema))
33+
34+
if err != nil {
35+
return err
36+
}
37+
}
38+
3039
// poor mans SQL migration
3140
_, err := session.SQL().Exec(fmt.Sprintf("create table if not exists %s(schema_version int not null, primary key(schema_version))", versionTableName))
3241
if err != nil {

util/sqldb/session_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package sqldb
22

33
import (
44
"context"
5+
"database/sql"
6+
"fmt"
57
"net/netip"
68
"runtime"
9+
"strconv"
710
"testing"
811
"time"
912

@@ -12,6 +15,7 @@ import (
1215
"github.com/stretchr/testify/assert"
1316
"github.com/stretchr/testify/require"
1417
"github.com/testcontainers/testcontainers-go"
18+
testmysql "github.com/testcontainers/testcontainers-go/modules/mysql"
1519
testpostgres "github.com/testcontainers/testcontainers-go/modules/postgres"
1620
"github.com/testcontainers/testcontainers-go/wait"
1721
"github.com/upper/db/v4"
@@ -85,6 +89,43 @@ func setupPostgresContainer(ctx context.Context, t *testing.T) (config.DBConfig,
8589
return dbConfig, termContainerFn, nil
8690
}
8791

92+
func setupMySQLContainerWithDatabase(ctx context.Context, t *testing.T, database string) (config.DBConfig, func(), error) {
93+
mysqlContainer, err := testmysql.Run(ctx,
94+
"mysql:8.4.5",
95+
testmysql.WithDatabase(database),
96+
testmysql.WithUsername(userName),
97+
testmysql.WithPassword(password),
98+
)
99+
if err != nil {
100+
return config.DBConfig{}, nil, err
101+
}
102+
103+
host, err := mysqlContainer.Host(ctx)
104+
require.NoError(t, err)
105+
portS, err := mysqlContainer.MappedPort(ctx, "3306/tcp")
106+
require.NoError(t, err)
107+
port, err := strconv.Atoi(portS.Port())
108+
require.NoError(t, err)
109+
110+
cfg := config.DBConfig{
111+
MySQL: &config.MySQLConfig{
112+
DatabaseConfig: config.DatabaseConfig{
113+
Database: database,
114+
Host: host,
115+
Port: port,
116+
},
117+
},
118+
}
119+
120+
termContainerFn := func() {
121+
if err := testcontainers.TerminateContainer(mysqlContainer); err != nil {
122+
t.Logf("failed to terminate container: %s", err)
123+
}
124+
}
125+
126+
return cfg, termContainerFn, nil
127+
}
128+
88129
func TestSessionReconnect(t *testing.T) {
89130
if runtime.GOOS == "windows" {
90131
t.Skip("This test uses the Linux container image and therefore cannot be performed on the Windows platform")
@@ -130,3 +171,57 @@ func TestSessionReconnect(t *testing.T) {
130171
<-doneChan
131172
cancel()
132173
}
174+
175+
func TestPostgresSessionUsesSchema(t *testing.T) {
176+
if runtime.GOOS == "windows" {
177+
t.Skip("This test uses the Linux container image and therefore cannot be performed on the Windows platform")
178+
}
179+
180+
ctx := logging.TestContext(t.Context())
181+
cfg, cleanup, err := setupPostgresContainer(ctx, t)
182+
require.NoError(t, err)
183+
defer cleanup()
184+
185+
createSQLDB, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", cfg.PostgreSQL.Host, cfg.PostgreSQL.Port, userName, password, cfg.PostgreSQL.Database))
186+
require.NoError(t, err)
187+
defer createSQLDB.Close()
188+
189+
_, err = createSQLDB.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS myschema")
190+
require.NoError(t, err)
191+
192+
cfg.PostgreSQL.Schema = "myschema"
193+
session, dbType, err := CreateDBSessionWithCreds(cfg, userName, password)
194+
require.NoError(t, err)
195+
require.Equal(t, Postgres, dbType)
196+
defer session.Close()
197+
198+
var currentSchema string
199+
row, err := session.SQL().QueryRow("SELECT current_schema()")
200+
require.NoError(t, err)
201+
err = row.Scan(&currentSchema)
202+
require.NoError(t, err)
203+
require.Equal(t, "myschema", currentSchema)
204+
}
205+
206+
func TestMySQLSessionUsesSchema(t *testing.T) {
207+
if runtime.GOOS == "windows" {
208+
t.Skip("This test uses the Linux container image and therefore cannot be performed on the Windows platform")
209+
}
210+
211+
ctx := logging.TestContext(t.Context())
212+
cfg, cleanup, err := setupMySQLContainerWithDatabase(ctx, t, dbName)
213+
require.NoError(t, err)
214+
defer cleanup()
215+
216+
session, dbType, err := CreateDBSessionWithCreds(cfg, userName, password)
217+
require.NoError(t, err)
218+
require.Equal(t, MySQL, dbType)
219+
defer session.Close()
220+
221+
var currentDatabase string
222+
row, err := session.SQL().QueryRow("SELECT DATABASE()")
223+
require.NoError(t, err)
224+
err = row.Scan(&currentDatabase)
225+
require.NoError(t, err)
226+
require.Equal(t, dbName, currentDatabase)
227+
}

0 commit comments

Comments
 (0)