Skip to content

Conversation

@yottahmd
Copy link
Collaborator

@yottahmd yottahmd commented Jan 16, 2026

Summary by CodeRabbit

  • New Features

    • Worker-level PostgreSQL connection pool configuration to manage connections across multiple DAGs in a single worker process.
    • Centralized connection pool management for shared-nothing worker mode with reference-counted pool lifecycle.
  • Improvements

    • Executor resource cleanup to ensure proper connection and resource release after execution.
    • Fixed connection pool defaults for improved stability and consistency.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

This PR introduces a centralized PostgreSQL connection pool manager for shared-nothing worker mode, enabling multiple DAGs in a single worker process to share pool resources across different DSNs. Pool configuration moves from per-step SQL executors to worker-level settings, with a new GlobalPoolManager coordinating reference-counted pools and enforcing global connection limits.

Changes

Cohort / File(s) Summary
Worker & Config Pool Setup
internal/cmn/config/config.go, internal/cmn/config/definition.go, internal/cmn/config/loader.go, internal/cmn/config/loader_test.go
Adds PostgresPoolConfig struct and WorkerDef.PostgresPool field to support worker-level pool configuration; loader binds environment variables (WORKER_POSTGRES_POOL_*) and applies defaults (MaxOpenConns=25, MaxIdleConns=5, ConnMaxLifetime=300, ConnMaxIdleTime=60).
Global Pool Manager
internal/runtime/builtin/sql/global_pool.go
Introduces GlobalPoolManager with reference-counted multi-DSN pooling, DSN hashing, read/write-lock fast-path for pool lookup, per-pool connection limit distribution, and lifecycle management (Stats/Close methods).
SQL Executor Pool Refactoring
internal/runtime/builtin/sql/config.go, internal/runtime/builtin/sql/sql.go
Removes MaxOpenConns, MaxIdleConns, ConnMaxLifetime from SQL Config struct; updates sqlExecutor to detect and attach global pool manager when available, adds Close() method for resource cleanup, implements io.Closer interface.
SQL Connection Defaults
internal/runtime/builtin/sql/connection.go
Replaces dynamic pool configuration with fixed defaults (defaultMaxOpenConns, defaultMaxIdleConns, defaultConnMaxLifetime) during connection setup; adds comments distinguishing non-worker vs worker mode pooling.
Executor Resource Cleanup
internal/runtime/executor/executor.go, internal/runtime/node.go
Adds CloseExecutor() utility function to safely close executors implementing io.Closer; Node.Execute defers cleanup with warning logging on failure.
Worker Lifecycle Integration
internal/service/worker/worker.go
Extends Worker to accept config, initializes GlobalPoolManager in Start when in shared-nothing mode, closes manager in Stop, adds isSharedNothingMode() helper.
Test Updates
internal/runtime/builtin/sql/sql_test.go
Removes TestParseConfig_ConnectionPool; renames TestConnectionManager_ConnectionPoolSettings to TestConnectionManager_FixedConnectionPoolDefaults to validate fixed-default behavior instead of per-step config.

Sequence Diagram

sequenceDiagram
    participant Worker
    participant GlobalPoolManager
    participant DSN1 as Pool (DSN 1)
    participant DSN2 as Pool (DSN 2)
    participant Database

    Worker->>GlobalPoolManager: NewGlobalPoolManager(cfg)
    Note over GlobalPoolManager: Initialize with max limits<br/>empty pool map

    Worker->>GlobalPoolManager: GetOrCreatePool(ctx, driver, cfg) [DSN1]
    GlobalPoolManager->>GlobalPoolManager: Read-lock attempt (miss)
    GlobalPoolManager->>GlobalPoolManager: Acquire write-lock
    GlobalPoolManager->>GlobalPoolManager: Compute DSN hash
    GlobalPoolManager->>Database: driver.Connect(ctx, cfg)
    Database-->>DSN1: sql.DB connection
    GlobalPoolManager->>DSN1: SetMaxOpenConns (distribute global limit)
    GlobalPoolManager->>DSN1: SetMaxIdleConns, ConnMaxLifetime, ConnMaxIdleTime
    GlobalPoolManager->>DSN1: Ping (validate)
    DSN1-->>GlobalPoolManager: OK
    GlobalPoolManager->>GlobalPoolManager: Store poolEntry (refCount=1)
    GlobalPoolManager-->>Worker: sql.DB

    Worker->>GlobalPoolManager: GetOrCreatePool(ctx, driver, cfg) [DSN2]
    GlobalPoolManager->>GlobalPoolManager: Read-lock fast-path (miss)
    GlobalPoolManager->>GlobalPoolManager: Acquire write-lock
    GlobalPoolManager->>Database: driver.Connect(ctx, cfg)
    Database-->>DSN2: sql.DB connection
    GlobalPoolManager->>DSN2: SetMaxOpenConns (re-distribute limits across both)
    GlobalPoolManager->>DSN2: SetMaxIdleConns, ConnMaxLifetime, ConnMaxIdleTime
    GlobalPoolManager->>DSN2: Ping (validate)
    DSN2-->>GlobalPoolManager: OK
    GlobalPoolManager-->>Worker: sql.DB

    Worker->>GlobalPoolManager: ReleasePool(DSN1)
    GlobalPoolManager->>GlobalPoolManager: Decrement refCount (DSN1)

    Worker->>GlobalPoolManager: Close()
    GlobalPoolManager->>DSN1: Close() + cleanup
    GlobalPoolManager->>DSN2: Close() + cleanup
    GlobalPoolManager->>GlobalPoolManager: Clear pools, aggregate errors
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • feat: shared-nothing worker #1564: Directly related shared-nothing worker architecture and pool management pathway activation in worker initialization and lifecycle.
  • feat: postgres sql executor #1562: Related SQL executor connection pooling and driver infrastructure that this PR refactors from per-executor to centralized GlobalPoolManager pattern.
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.75% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: shared connection pool for pg' directly and accurately summarizes the main change: introducing a shared PostgreSQL connection pool for worker environments.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@internal/runtime/builtin/sql/global_pool.go`:
- Around line 71-141: GetOrCreatePool currently only applies the recalculated
per-DSN limit to the newly created DB, so total connections can exceed
m.config.MaxOpenConns; after computing perDSNLimit (and treating
m.config.MaxOpenConns <= 0 as "no limit" so you should skip enforcing limits),
iterate m.pools and for each poolEntry call
entry.db.SetMaxOpenConns(perDSNLimit) to reapply the new limit (ensure this runs
while the write lock in GetOrCreatePool is held), then set the same on the newly
created db before storing the entry; use identifiers GetOrCreatePool, m.pools,
poolEntry, perDSNLimit, and db.SetMaxOpenConns.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd787e9 and a2a9f5d.

📒 Files selected for processing (12)
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/cmn/config/loader.go
  • internal/cmn/config/loader_test.go
  • internal/runtime/builtin/sql/config.go
  • internal/runtime/builtin/sql/connection.go
  • internal/runtime/builtin/sql/global_pool.go
  • internal/runtime/builtin/sql/sql.go
  • internal/runtime/builtin/sql/sql_test.go
  • internal/runtime/executor/executor.go
  • internal/runtime/node.go
  • internal/service/worker/worker.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/runtime/builtin/sql/config.go
  • internal/cmn/config/loader_test.go
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/runtime/builtin/sql/connection.go
  • internal/runtime/executor/executor.go
  • internal/runtime/builtin/sql/global_pool.go
  • internal/cmn/config/loader.go
  • internal/runtime/builtin/sql/sql_test.go
  • internal/runtime/node.go
  • internal/service/worker/worker.go
  • internal/runtime/builtin/sql/sql.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/cmn/config/loader_test.go
  • internal/runtime/builtin/sql/sql_test.go
🧠 Learnings (2)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths

Applied to files:

  • internal/runtime/builtin/sql/sql_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/service/worker/worker.go
🧬 Code graph analysis (6)
internal/cmn/config/loader_test.go (1)
internal/cmn/config/config.go (1)
  • PostgresPoolConfig (369-386)
internal/runtime/executor/executor.go (1)
internal/runtime/builtin/chat/executor.go (1)
  • Executor (29-38)
internal/runtime/builtin/sql/global_pool.go (2)
internal/runtime/builtin/sql/driver.go (1)
  • Driver (12-47)
internal/runtime/builtin/sql/config.go (1)
  • Config (13-48)
internal/cmn/config/loader.go (2)
internal/cmn/config/config.go (1)
  • Worker (336-345)
internal/service/worker/worker.go (1)
  • Worker (23-45)
internal/runtime/builtin/sql/sql_test.go (1)
internal/runtime/builtin/sql/config.go (1)
  • ParseConfig (93-145)
internal/runtime/node.go (2)
internal/runtime/executor/executor.go (1)
  • CloseExecutor (18-26)
internal/cmn/logger/tag/tag.go (1)
  • Name (276-278)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (33)
internal/runtime/executor/executor.go (1)

15-26: Nice safe-close helper for executors.
Clean and safe: nil guard and io.Closer detection are handled well.

internal/runtime/node.go (1)

182-189: Good defensive cleanup after execution.
Deferring CloseExecutor with a warning on failure is solid lifecycle hygiene.

internal/runtime/builtin/sql/connection.go (2)

22-35: Defaults are clearly documented and conservative.
The comments and constants make the non-worker pooling intent explicit.


66-71: No action needed. The fixed connection pool defaults are appropriately scoped to non-worker mode. In shared-nothing worker mode with PostgreSQL, the code uses GlobalPoolManager (initialized with Worker.PostgresPool settings) instead of NewConnectionManager, so the fixed defaults never override worker-level pooling. The conditional logic in sql.go lines 75–97 ensures these are mutually exclusive code paths.

Likely an incorrect or invalid review comment.

internal/cmn/config/definition.go (2)

389-391: Worker-level pool hook looks good.
Field and tag align with the worker configuration flow.


412-430: Clear pool definition and defaults documentation.
The struct is well-scoped and comments communicate the intended limits.

internal/runtime/builtin/sql/config.go (3)

86-88: Defaults remain minimal and sensible.
No concerns with the simplified defaults here.


258-258: Schema update looks consistent.
DSN property remains clear and aligned with the current config surface.


284-284: Schema update looks consistent.
SQLite DSN entry matches the updated schema expectations.

internal/cmn/config/config.go (2)

335-345: Worker-level pool config placement is clear.

Scoping the shared-nothing pool settings under Worker keeps the configuration model intuitive.


366-386: PostgresPoolConfig documentation is solid.

Defaults and semantics are clearly stated, which will help downstream users and maintainers.

internal/cmn/config/loader.go (3)

673-703: Worker pool settings load/default flow is well-handled.

Non-zero overrides plus deterministic defaults make the worker config stable.


1110-1114: Viper defaults match pool config expectations.

Keeps default values consistent with the config docs.


1252-1256: Env bindings cover all new pool knobs.

Nice to expose the full set of settings for runtime tuning.

internal/cmn/config/loader_test.go (2)

236-243: Env-load expectations updated appropriately.

Good to assert worker pool defaults in the env-backed config test.


487-498: YAML expectations now include pool defaults.

Keeps test fixtures aligned with the new worker pool configuration.

internal/runtime/builtin/sql/sql_test.go (1)

2017-2039: Fixed-default pool behavior is covered well.

The test aligns with the new pooling model and validates the non-worker default.

internal/service/worker/worker.go (6)

3-20: Imports updated appropriately for pool manager integration.

Needed dependencies are in place for structured logging and pool lifecycle.


22-45: Worker state extensions are clean and cohesive.

Adding cfg and poolManager keeps lifecycle state well-organized.


52-72: Constructor wiring for config is straightforward.

Storing cfg here keeps later initialization logic simple.


87-100: Shared-nothing pool manager initialization looks solid.

Context wiring and logs make the pool manager lifecycle clear.


162-180: Pool manager cleanup integrates well with shutdown.

Error handling and logging are consistent with the existing stop flow.


340-345: Shared-nothing mode detection is clear and documented.

The helper keeps Start focused and readable.

internal/runtime/builtin/sql/sql.go (4)

25-28: Good interface conformance guard.

Keeps Executor and io.Closer checks compile‑time.


31-43: Lifecycle state fields look appropriate.

The added pool and closed state make ownership and cleanup clearer.


70-108: Global pool selection path is clear.

The branching keeps Postgres pooled while other drivers stay per-step.


142-175: Idempotent Close flow looks solid.

Mutex + closed guard prevents double-close and clears references.

internal/runtime/builtin/sql/global_pool.go (6)

14-57: Solid pool manager scaffolding.


59-69: Context helpers are straightforward.


143-155: ReleasePool is simple and clear.


157-186: Stats output is helpful for observability.


188-218: Graceful shutdown logic looks good.


220-225: DSN hashing avoids credential exposure in map keys.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@yottahmd yottahmd merged commit 621444e into main Jan 17, 2026
5 checks passed
@yottahmd yottahmd deleted the pg-conn branch January 17, 2026 04:17
@codecov
Copy link

codecov bot commented Jan 17, 2026

Codecov Report

❌ Patch coverage is 19.32367% with 167 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.76%. Comparing base (ce036b8) to head (5e9da67).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/runtime/builtin/sql/global_pool.go 3.00% 97 Missing ⚠️
internal/runtime/builtin/sql/sql.go 30.95% 27 Missing and 2 partials ⚠️
internal/service/worker/worker.go 11.53% 21 Missing and 2 partials ⚠️
internal/cmn/config/loader.go 63.63% 4 Missing and 4 partials ⚠️
internal/runtime/executor/executor.go 0.00% 6 Missing ⚠️
internal/runtime/node.go 20.00% 3 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1584      +/-   ##
==========================================
- Coverage   65.09%   64.76%   -0.34%     
==========================================
  Files         258      259       +1     
  Lines       28645    28833     +188     
==========================================
+ Hits        18647    18674      +27     
- Misses       8327     8484     +157     
- Partials     1671     1675       +4     
Files with missing lines Coverage Δ
internal/cmn/config/config.go 85.71% <ø> (ø)
internal/runtime/builtin/sql/config.go 87.65% <100.00%> (+7.21%) ⬆️
internal/runtime/builtin/sql/connection.go 73.21% <100.00%> (-1.79%) ⬇️
internal/runtime/node.go 76.07% <20.00%> (-0.56%) ⬇️
internal/runtime/executor/executor.go 0.00% <0.00%> (ø)
internal/cmn/config/loader.go 78.56% <63.63%> (-0.54%) ⬇️
internal/service/worker/worker.go 74.71% <11.53%> (-10.25%) ⬇️
internal/runtime/builtin/sql/sql.go 62.76% <30.95%> (-4.20%) ⬇️
internal/runtime/builtin/sql/global_pool.go 3.00% <3.00%> (ø)

... and 7 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ce036b8...5e9da67. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants