Skip to content

Commit 274faff

Browse files
committed
postgres: add connection limit
1 parent b0fa19e commit 274faff

File tree

8 files changed

+130
-7
lines changed

8 files changed

+130
-7
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ ifeq ($(dbbackend),postgres)
194194

195195
# Start a fresh postgres instance. Allow a maximum of 500 connections.
196196
# This is required for the async benchmark to pass.
197-
docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine -N 500
197+
docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine
198198
docker logs -f lnd-postgres &
199199

200200
# Wait for the instance to be started.

docs/release-notes/release-notes-0.14.0.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,8 @@ messages directly. There is no routing/path finding involved.
632632

633633
* [Fixes a bug that would cause pruned nodes to stall out](https://github.com/lightningnetwork/lnd/pull/5970)
634634

635+
* [Add Postgres connection limit](https://github.com/lightningnetwork/lnd/pull/5992)
636+
635637
## Documentation
636638

637639
The [code contribution guidelines have been updated to mention the new

kvdb/postgres/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import "time"
44

55
// Config holds postgres configuration data.
66
type Config struct {
7-
Dsn string `long:"dsn" description:"Database connection string."`
8-
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
7+
Dsn string `long:"dsn" description:"Database connection string."`
8+
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
9+
MaxConnections int `long:"maxconnections" description:"The maximum number of open connections to the database. Set to zero for unlimited."`
910
}

kvdb/postgres/db.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"time"
1414

1515
"github.com/btcsuite/btcwallet/walletdb"
16-
_ "github.com/jackc/pgx/v4/stdlib"
1716
)
1817

1918
const (
@@ -58,6 +57,14 @@ type db struct {
5857
// Enforce db implements the walletdb.DB interface.
5958
var _ walletdb.DB = (*db)(nil)
6059

60+
// Global set of database connections.
61+
var dbConns *dbConnSet
62+
63+
// Init initializes the global set of database connections.
64+
func Init(maxConnections int) {
65+
dbConns = newDbConnSet(maxConnections)
66+
}
67+
6168
// newPostgresBackend returns a db object initialized with the passed backend
6269
// config. If postgres connection cannot be estabished, then returns error.
6370
func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
@@ -67,7 +74,11 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
6774
return nil, errors.New("empty postgres prefix")
6875
}
6976

70-
dbConn, err := sql.Open("pgx", config.Dsn)
77+
if dbConns == nil {
78+
return nil, errors.New("db connection set not initialized")
79+
}
80+
81+
dbConn, err := dbConns.Open(config.Dsn)
7182
if err != nil {
7283
return nil, err
7384
}
@@ -245,5 +256,5 @@ func (db *db) Copy(w io.Writer) error {
245256
// Close cleanly shuts down the database and syncs all data.
246257
// This function is part of the walletdb.Db interface implementation.
247258
func (db *db) Close() error {
248-
return db.db.Close()
259+
return dbConns.Close(db.cfg.Dsn)
249260
}

kvdb/postgres/db_conn_set.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package postgres
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"sync"
7+
8+
_ "github.com/jackc/pgx/v4/stdlib"
9+
)
10+
11+
// dbConn stores the actual connection and a user count.
12+
type dbConn struct {
13+
db *sql.DB
14+
count int
15+
}
16+
17+
// dbConnSet stores a set of connections.
18+
type dbConnSet struct {
19+
dbConn map[string]*dbConn
20+
maxConnections int
21+
22+
sync.Mutex
23+
}
24+
25+
// newDbConnSet initializes a new set of connections.
26+
func newDbConnSet(maxConnections int) *dbConnSet {
27+
return &dbConnSet{
28+
dbConn: make(map[string]*dbConn),
29+
maxConnections: maxConnections,
30+
}
31+
}
32+
33+
// Open opens a new database connection. If a connection already exists for the
34+
// given dsn, the existing connection is returned.
35+
func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
36+
d.Lock()
37+
defer d.Unlock()
38+
39+
if dbConn, ok := d.dbConn[dsn]; ok {
40+
dbConn.count++
41+
42+
return dbConn.db, nil
43+
}
44+
45+
db, err := sql.Open("pgx", dsn)
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
// Limit maximum number of open connections. This is useful to prevent
51+
// the server from running out of connections and returning an error.
52+
// With this client-side limit in place, lnd will wait for a connection
53+
// to become available.
54+
if d.maxConnections != 0 {
55+
db.SetMaxOpenConns(d.maxConnections)
56+
}
57+
58+
d.dbConn[dsn] = &dbConn{
59+
db: db,
60+
count: 1,
61+
}
62+
63+
return db, nil
64+
}
65+
66+
// Close closes the connection with the given dsn. If there are still other
67+
// users of the same connection, this function does nothing.
68+
func (d *dbConnSet) Close(dsn string) error {
69+
d.Lock()
70+
defer d.Unlock()
71+
72+
dbConn, ok := d.dbConn[dsn]
73+
if !ok {
74+
return fmt.Errorf("connection not found: %v", dsn)
75+
}
76+
77+
// Reduce user count.
78+
dbConn.count--
79+
80+
// Do not close if there are other users.
81+
if dbConn.count > 0 {
82+
return nil
83+
}
84+
85+
// Close connection.
86+
delete(d.dbConn, dsn)
87+
88+
return dbConn.db.Close()
89+
}

kvdb/postgres/no_db.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//go:build !kvdb_postgres
2+
// +build !kvdb_postgres
3+
4+
package postgres
5+
6+
func Init(maxConnections int) {}

lncfg/db.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const (
2323
PostgresBackend = "postgres"
2424
DefaultBatchCommitInterval = 500 * time.Millisecond
2525

26+
defaultPostgresMaxConnections = 50
27+
2628
// NSChannelDB is the namespace name that we use for the combined graph
2729
// and channel state DB.
2830
NSChannelDB = "channeldb"
@@ -71,6 +73,9 @@ func DefaultDB() *DB {
7173
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
7274
DBTimeout: kvdb.DefaultDBTimeout,
7375
},
76+
Postgres: &postgres.Config{
77+
MaxConnections: defaultPostgresMaxConnections,
78+
},
7479
}
7580
}
7681

@@ -113,7 +118,8 @@ func (db *DB) Validate() error {
113118
// on configuration.
114119
func (db *DB) Init(ctx context.Context, dbPath string) error {
115120
// Start embedded etcd server if requested.
116-
if db.Backend == EtcdBackend && db.Etcd.Embedded {
121+
switch {
122+
case db.Backend == EtcdBackend && db.Etcd.Embedded:
117123
cfg, _, err := kvdb.StartEtcdTestBackend(
118124
dbPath, db.Etcd.EmbeddedClientPort,
119125
db.Etcd.EmbeddedPeerPort, db.Etcd.EmbeddedLogFile,
@@ -125,6 +131,9 @@ func (db *DB) Init(ctx context.Context, dbPath string) error {
125131
// Override the original config with the config for
126132
// the embedded instance.
127133
db.Etcd = cfg
134+
135+
case db.Backend == PostgresBackend:
136+
postgres.Init(db.Postgres.MaxConnections)
128137
}
129138

130139
return nil

sample-lnd.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,11 @@ litecoin.node=ltcd
11771177
; disable.
11781178
; db.postgres.timeout=
11791179

1180+
; Postgres maximum number of connections. Set to zero for unlimited. It is
1181+
; recommended to set a limit that is below the server connection limit.
1182+
; Otherwise errors may occur in lnd under high-load conditions.
1183+
; db.postgres.maxconnections=
1184+
11801185
[bolt]
11811186

11821187
; If true, prevents the database from syncing its freelist to disk.

0 commit comments

Comments
 (0)