Skip to content

Commit 621444e

Browse files
authored
feat: shared connection pool for pg (#1584)
* **New Features** * Worker-level PostgreSQL connection pool configuration to manage connections across multiple DAGs in a single worker process. * Centralized connection pool management for shared-nothing worker mode with reference-counted pool lifecycle. * **Improvements** * Executor resource cleanup to ensure proper connection and resource release after execution. * Fixed connection pool defaults for improved stability and consistency.
1 parent bd787e9 commit 621444e

File tree

12 files changed

+507
-75
lines changed

12 files changed

+507
-75
lines changed

internal/cmn/config/config.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,10 @@ type Worker struct {
338338
MaxActiveRuns int // Maximum number of active runs (default: 100)
339339
Labels map[string]string // Worker labels for capability matching
340340
Coordinators []string // Coordinator addresses for static discovery (host:port)
341+
342+
// PostgresPool holds connection pool settings for shared-nothing mode.
343+
// When multiple DAGs run concurrently in a worker, they share this pool.
344+
PostgresPool PostgresPoolConfig
341345
}
342346

343347
// Scheduler represents the scheduler configuration
@@ -359,6 +363,28 @@ type Scheduler struct {
359363
ZombieDetectionInterval time.Duration
360364
}
361365

366+
// PostgresPoolConfig holds PostgreSQL connection pool settings for workers.
367+
// Used in shared-nothing worker mode to prevent connection exhaustion
368+
// when multiple DAGs run concurrently in a single worker process.
369+
type PostgresPoolConfig struct {
370+
// MaxOpenConns is the maximum total open connections across ALL PostgreSQL DSNs.
371+
// This is the hard limit shared across all database connections.
372+
// Default: 25
373+
MaxOpenConns int
374+
375+
// MaxIdleConns is the maximum number of idle connections per DSN.
376+
// Default: 5
377+
MaxIdleConns int
378+
379+
// ConnMaxLifetime is the maximum lifetime of a connection in seconds.
380+
// Default: 300 (5 minutes)
381+
ConnMaxLifetime int
382+
383+
// ConnMaxIdleTime is the maximum idle time for a connection in seconds.
384+
// Default: 60 (1 minute)
385+
ConnMaxIdleTime int
386+
}
387+
362388
// Peer holds the certificate and TLS configuration for peer connections over gRPC.
363389
type Peer struct {
364390
// CertFile is the path to the server's TLS certificate file.

internal/cmn/config/definition.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,9 @@ type WorkerDef struct {
386386
// When specified, the worker will connect directly to these coordinators
387387
// instead of using the file-based service registry.
388388
Coordinators interface{} `mapstructure:"coordinators"`
389+
390+
// PostgresPool holds connection pool settings for shared-nothing mode.
391+
PostgresPool *PostgresPoolDef `mapstructure:"postgresPool"`
389392
}
390393

391394
// SchedulerDef holds the configuration for the scheduler.
@@ -405,3 +408,23 @@ type SchedulerDef struct {
405408
// Default is 45 seconds. Set to 0 to disable.
406409
ZombieDetectionInterval string `mapstructure:"zombieDetectionInterval"`
407410
}
411+
412+
// PostgresPoolDef holds the definition for PostgreSQL connection pool configuration.
413+
// Used in shared-nothing worker mode to prevent connection exhaustion.
414+
type PostgresPoolDef struct {
415+
// MaxOpenConns is the maximum total open connections across ALL PostgreSQL DSNs.
416+
// Default: 25
417+
MaxOpenConns int `mapstructure:"maxOpenConns"`
418+
419+
// MaxIdleConns is the maximum number of idle connections per DSN.
420+
// Default: 5
421+
MaxIdleConns int `mapstructure:"maxIdleConns"`
422+
423+
// ConnMaxLifetime is the maximum lifetime of a connection in seconds.
424+
// Default: 300 (5 minutes)
425+
ConnMaxLifetime int `mapstructure:"connMaxLifetime"`
426+
427+
// ConnMaxIdleTime is the maximum idle time for a connection in seconds.
428+
// Default: 60 (1 minute)
429+
ConnMaxIdleTime int `mapstructure:"connMaxIdleTime"`
430+
}

internal/cmn/config/loader.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,37 @@ func (l *ConfigLoader) loadWorkerConfig(cfg *Config, def Definition) {
669669
cfg.Worker.Coordinators = addresses
670670
l.warnings = append(l.warnings, addrWarnings...)
671671
}
672+
673+
// Load PostgresPool config
674+
if def.Worker.PostgresPool != nil {
675+
pp := def.Worker.PostgresPool
676+
if pp.MaxOpenConns > 0 {
677+
cfg.Worker.PostgresPool.MaxOpenConns = pp.MaxOpenConns
678+
}
679+
if pp.MaxIdleConns > 0 {
680+
cfg.Worker.PostgresPool.MaxIdleConns = pp.MaxIdleConns
681+
}
682+
if pp.ConnMaxLifetime > 0 {
683+
cfg.Worker.PostgresPool.ConnMaxLifetime = pp.ConnMaxLifetime
684+
}
685+
if pp.ConnMaxIdleTime > 0 {
686+
cfg.Worker.PostgresPool.ConnMaxIdleTime = pp.ConnMaxIdleTime
687+
}
688+
}
689+
}
690+
691+
// Set PostgresPool defaults if not configured
692+
if cfg.Worker.PostgresPool.MaxOpenConns == 0 {
693+
cfg.Worker.PostgresPool.MaxOpenConns = 25
694+
}
695+
if cfg.Worker.PostgresPool.MaxIdleConns == 0 {
696+
cfg.Worker.PostgresPool.MaxIdleConns = 5
697+
}
698+
if cfg.Worker.PostgresPool.ConnMaxLifetime == 0 {
699+
cfg.Worker.PostgresPool.ConnMaxLifetime = 300
700+
}
701+
if cfg.Worker.PostgresPool.ConnMaxIdleTime == 0 {
702+
cfg.Worker.PostgresPool.ConnMaxIdleTime = 60
672703
}
673704
}
674705

@@ -1075,6 +1106,12 @@ func (l *ConfigLoader) setViperDefaultValues(paths Paths) {
10751106
// Monitoring settings
10761107
l.v.SetDefault("monitoring.retention", "24h")
10771108
l.v.SetDefault("monitoring.interval", "5s")
1109+
1110+
// Worker PostgreSQL pool settings (for shared-nothing mode)
1111+
l.v.SetDefault("worker.postgresPool.maxOpenConns", 25)
1112+
l.v.SetDefault("worker.postgresPool.maxIdleConns", 5)
1113+
l.v.SetDefault("worker.postgresPool.connMaxLifetime", 300)
1114+
l.v.SetDefault("worker.postgresPool.connMaxIdleTime", 60)
10781115
}
10791116

10801117
// envBinding defines a mapping between a config key and its environment variable.
@@ -1211,6 +1248,12 @@ var envBindings = []envBinding{
12111248
// Monitoring configuration
12121249
{key: "monitoring.retention", env: "MONITORING_RETENTION"},
12131250
{key: "monitoring.interval", env: "MONITORING_INTERVAL"},
1251+
1252+
// Worker PostgreSQL pool configuration (for shared-nothing mode)
1253+
{key: "worker.postgresPool.maxOpenConns", env: "WORKER_POSTGRES_POOL_MAX_OPEN_CONNS"},
1254+
{key: "worker.postgresPool.maxIdleConns", env: "WORKER_POSTGRES_POOL_MAX_IDLE_CONNS"},
1255+
{key: "worker.postgresPool.connMaxLifetime", env: "WORKER_POSTGRES_POOL_CONN_MAX_LIFETIME"},
1256+
{key: "worker.postgresPool.connMaxIdleTime", env: "WORKER_POSTGRES_POOL_CONN_MAX_IDLE_TIME"},
12141257
}
12151258

12161259
// bindEnvironmentVariables binds configuration keys to environment variables using the loader's viper instance.

internal/cmn/config/loader_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ func TestLoad_Env(t *testing.T) {
235235
Worker: Worker{
236236
ID: "test-worker-123",
237237
MaxActiveRuns: 200,
238+
PostgresPool: PostgresPoolConfig{
239+
MaxOpenConns: 25,
240+
MaxIdleConns: 5,
241+
ConnMaxLifetime: 300,
242+
ConnMaxIdleTime: 60,
243+
},
238244
},
239245
Scheduler: Scheduler{
240246
Port: 9999,
@@ -484,6 +490,12 @@ scheduler:
484490
"env": "production",
485491
"region": "us-west-2",
486492
},
493+
PostgresPool: PostgresPoolConfig{
494+
MaxOpenConns: 25,
495+
MaxIdleConns: 5,
496+
ConnMaxLifetime: 300,
497+
ConnMaxIdleTime: 60,
498+
},
487499
},
488500
Scheduler: Scheduler{
489501
Port: 7890,

internal/runtime/builtin/sql/config.go

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ type Config struct {
1717
// - SQLite: "file:./data.db?mode=rw" or ":memory:"
1818
DSN string `mapstructure:"dsn"`
1919

20-
// Connection pool settings
21-
MaxOpenConns int `mapstructure:"maxOpenConns"` // Maximum open connections (default: 5)
22-
MaxIdleConns int `mapstructure:"maxIdleConns"` // Maximum idle connections (default: 2)
23-
ConnMaxLifetime int `mapstructure:"connMaxLifetime"` // Connection max lifetime in seconds (default: 300)
24-
2520
// Parameterized queries (SQL injection prevention)
2621
// Can be map[string]any for named params or []any for positional params
2722
Params any `mapstructure:"params"`
@@ -86,15 +81,11 @@ type ImportConfig struct {
8681
}
8782

8883
// DefaultConfig returns a Config with default values.
89-
// These defaults match the JSON schema documentation.
9084
func DefaultConfig() *Config {
9185
return &Config{
92-
MaxOpenConns: 5, // Match schema default
93-
MaxIdleConns: 2, // Match schema default
94-
ConnMaxLifetime: 300, // Match schema default (seconds)
95-
Timeout: 60,
96-
OutputFormat: "jsonl",
97-
NullString: "null",
86+
Timeout: 60,
87+
OutputFormat: "jsonl",
88+
NullString: "null",
9889
}
9990
}
10091

@@ -138,19 +129,6 @@ func ParseConfig(_ context.Context, mapCfg map[string]any) (*Config, error) {
138129
}
139130
}
140131

141-
// Validate connection pool settings
142-
if cfg.MaxOpenConns < 0 {
143-
return nil, fmt.Errorf("maxOpenConns must be non-negative")
144-
}
145-
if cfg.MaxIdleConns < 0 {
146-
return nil, fmt.Errorf("maxIdleConns must be non-negative")
147-
}
148-
if cfg.MaxIdleConns > cfg.MaxOpenConns && cfg.MaxOpenConns > 0 {
149-
return nil, fmt.Errorf("maxIdleConns (%d) cannot exceed maxOpenConns (%d)", cfg.MaxIdleConns, cfg.MaxOpenConns)
150-
}
151-
if cfg.ConnMaxLifetime < 0 {
152-
return nil, fmt.Errorf("connMaxLifetime must be non-negative")
153-
}
154132
if cfg.Timeout < 0 {
155133
return nil, fmt.Errorf("timeout must be non-negative")
156134
}
@@ -277,10 +255,7 @@ var importConfigSchema = &jsonschema.Schema{
277255
var postgresConfigSchema = &jsonschema.Schema{
278256
Type: "object",
279257
Properties: map[string]*jsonschema.Schema{
280-
"dsn": {Type: "string", Description: "PostgreSQL connection string (DSN)"},
281-
"maxOpenConns": {Type: "integer", Description: "Maximum open connections"},
282-
"maxIdleConns": {Type: "integer", Description: "Maximum idle connections"},
283-
"connMaxLifetime": {Type: "integer", Description: "Connection max lifetime in seconds"},
258+
"dsn": {Type: "string", Description: "PostgreSQL connection string (DSN)"},
284259
"params": {
285260
Description: "Query parameters (map for named, array for positional)",
286261
OneOf: []*jsonschema.Schema{
@@ -306,9 +281,7 @@ var postgresConfigSchema = &jsonschema.Schema{
306281
var sqliteConfigSchema = &jsonschema.Schema{
307282
Type: "object",
308283
Properties: map[string]*jsonschema.Schema{
309-
"dsn": {Type: "string", Description: "SQLite connection string (file path or :memory:)"},
310-
"maxOpenConns": {Type: "integer", Description: "Maximum open connections"},
311-
"maxIdleConns": {Type: "integer", Description: "Maximum idle connections"},
284+
"dsn": {Type: "string", Description: "SQLite connection string (file path or :memory:)"},
312285
"params": {
313286
Description: "Query parameters (map for named, array for positional)",
314287
OneOf: []*jsonschema.Schema{

internal/runtime/builtin/sql/connection.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@ type ConnectionManager struct {
1919
closed bool
2020
}
2121

22-
// Connection retry settings
23-
// These are generous to handle slow container startup in CI environments
22+
// Connection settings
2423
const (
24+
// Retry settings - generous to handle slow container startup in CI environments
2525
maxConnRetries = 30
2626
initialRetryDelay = 500 * time.Millisecond
2727
maxRetryDelay = 2 * time.Second
2828
pingTimeout = 5 * time.Second
29+
30+
// Default connection pool settings for non-worker mode
31+
// These are conservative values suitable for single-step execution.
32+
// In worker mode, the global pool manager handles pooling.
33+
defaultMaxOpenConns = 1 // Single connection per step
34+
defaultMaxIdleConns = 1 // Keep the connection ready
35+
defaultConnMaxLifetime = 5 * time.Minute
2936
)
3037

3138
// NewConnectionManager creates a new connection manager.
@@ -56,10 +63,12 @@ func NewConnectionManager(ctx context.Context, driver Driver, cfg *Config) (*Con
5663
return nil, fmt.Errorf("failed to connect to database after %d attempts: %w", attempt, lastErr)
5764
}
5865

59-
// Configure connection pool
60-
db.SetMaxOpenConns(cfg.MaxOpenConns)
61-
db.SetMaxIdleConns(cfg.MaxIdleConns)
62-
db.SetConnMaxLifetime(time.Duration(cfg.ConnMaxLifetime) * time.Second)
66+
// Configure connection pool with fixed defaults
67+
// In non-worker mode, each step has its own isolated connection.
68+
// In worker mode, the global pool manager handles pooling.
69+
db.SetMaxOpenConns(defaultMaxOpenConns)
70+
db.SetMaxIdleConns(defaultMaxIdleConns)
71+
db.SetConnMaxLifetime(defaultConnMaxLifetime)
6372

6473
// Verify connection with ping
6574
pingCtx, cancel := context.WithTimeout(ctx, pingTimeout)

0 commit comments

Comments
 (0)