Skip to content
This repository was archived by the owner on Aug 9, 2025. It is now read-only.

Commit f0f9dc1

Browse files
authored
Merge pull request #28 from interline-io/db-mw
Improve DB and River init
2 parents ba04240 + 851c51e commit f0f9dc1

2 files changed

Lines changed: 43 additions & 23 deletions

File tree

dbutil/db.go

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbutil
22

33
import (
44
"context"
5+
"database/sql"
56
"regexp"
67
"strings"
78
"time"
@@ -11,7 +12,6 @@ import (
1112
"github.com/interline-io/log"
1213
"github.com/jackc/pgx/v5/pgxpool"
1314
"github.com/jackc/pgx/v5/stdlib"
14-
_ "github.com/jackc/pgx/v5/stdlib"
1515
"github.com/jmoiron/sqlx"
1616
"github.com/jmoiron/sqlx/reflectx"
1717
)
@@ -25,38 +25,51 @@ func toSnakeCase(str string) string {
2525
return strings.ToLower(snake)
2626
}
2727

28-
func OpenDBPool(ctx context.Context, url string) (*pgxpool.Pool, *sqlx.DB, error) {
29-
pool, err := pgxpool.New(ctx, url)
30-
if err != nil {
31-
return nil, nil, err
32-
}
33-
db := sqlx.NewDb(stdlib.OpenDBFromPool(pool), "pgx")
28+
// ConfigureDB sets up common database configuration
29+
func ConfigureDB(sqlDb *sql.DB) (*sqlx.DB, error) {
30+
db := sqlx.NewDb(sqlDb, "pgx")
3431
db.SetMaxOpenConns(10)
3532
db.SetMaxIdleConns(10)
3633
db.SetConnMaxLifetime(time.Hour)
3734
if err := db.Ping(); err != nil {
3835
log.Error().Err(err).Msgf("could not connect to database")
39-
return nil, nil, err
36+
return nil, err
4037
}
4138
db.Mapper = reflectx.NewMapperFunc("db", toSnakeCase)
42-
return pool, db.Unsafe(), nil
39+
return db.Unsafe(), nil
4340
}
4441

45-
func OpenDB(url string) (*sqlx.DB, error) {
46-
db, err := sqlx.Open("pgx", url)
42+
func OpenPool(ctx context.Context, url string) (*pgxpool.Pool, error) {
43+
pool, err := pgxpool.New(ctx, url)
4744
if err != nil {
48-
log.Error().Err(err).Msg("could not open database")
45+
log.Error().Err(err).Msg("could not open database pool")
4946
return nil, err
5047
}
51-
db.SetMaxOpenConns(10)
52-
db.SetMaxIdleConns(10)
53-
db.SetConnMaxLifetime(time.Hour)
54-
if err := db.Ping(); err != nil {
55-
log.Error().Err(err).Msgf("could not connect to database")
48+
if err := pool.Ping(ctx); err != nil {
49+
log.Error().Err(err).Msg("could not ping database pool")
5650
return nil, err
5751
}
58-
db.Mapper = reflectx.NewMapperFunc("db", toSnakeCase)
59-
return db.Unsafe(), nil
52+
return pool, nil
53+
}
54+
55+
func OpenDBPool(ctx context.Context, url string) (*pgxpool.Pool, *sqlx.DB, error) {
56+
pool, err := pgxpool.New(ctx, url)
57+
if err != nil {
58+
return nil, nil, err
59+
}
60+
db, err := ConfigureDB(stdlib.OpenDBFromPool(pool))
61+
if err != nil {
62+
return nil, nil, err
63+
}
64+
return pool, db, nil
65+
}
66+
67+
func OpenDB(url string) (*sqlx.DB, error) {
68+
db, err := sqlx.Open("pgx", url)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return ConfigureDB(db.DB)
6073
}
6174

6275
// Select runs a query and reads results into dest.

jobs/river/river_jobs.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (r riverJobArgs) ToJob() jobs.Job {
4343
}
4444
}
4545

46-
func newRiverJobArgsFrmoJob(job jobs.Job) riverJobArgs {
46+
func newRiverJobArgsFromJob(job jobs.Job) riverJobArgs {
4747
return riverJobArgs{
4848
Queue: job.Queue,
4949
JobType: job.JobType,
@@ -67,28 +67,34 @@ type RiverJobs struct {
6767
}
6868

6969
func NewRiverJobs(pool *pgxpool.Pool, queuePrefix string) (*RiverJobs, error) {
70+
return NewRiverJobsWithMiddleware(pool, queuePrefix)
71+
}
72+
73+
func NewRiverJobsWithMiddleware(pool *pgxpool.Pool, queuePrefix string, middlewares ...rivertype.Middleware) (*RiverJobs, error) {
7074
w := &RiverJobs{
7175
pool: pool,
7276
jobMapper: jobs.NewJobMapper(),
7377
queuePrefix: queuePrefix,
7478
}
75-
return w, w.initClient()
79+
return w, w.initClient(middlewares...)
7680
}
7781

7882
func (w *RiverJobs) RiverClient() *river.Client[pgx.Tx] {
7983
return w.riverClient
8084
}
8185

82-
func (w *RiverJobs) initClient() error {
86+
func (w *RiverJobs) initClient(middlewares ...rivertype.Middleware) error {
8387
var err error
8488
defaultQueue := w.queueName("default")
8589
w.riverWorkers = river.NewWorkers()
90+
8691
w.riverClient, err = river.NewClient(riverpgxv5.New(w.pool), &river.Config{
8792
Queues: map[string]river.QueueConfig{defaultQueue: {MaxWorkers: 4}},
8893
JobTimeout: 120 * time.Minute,
8994
Workers: w.riverWorkers,
9095
FetchCooldown: 50 * time.Millisecond,
9196
FetchPollInterval: 100 * time.Millisecond,
97+
Middleware: middlewares,
9298
})
9399
if err != nil {
94100
return err
@@ -105,6 +111,7 @@ func (w *RiverJobs) initClient() error {
105111
return err
106112
}
107113
return nil
114+
108115
}
109116

110117
func (w *RiverJobs) Use(mwf jobs.JobMiddleware) {
@@ -166,7 +173,7 @@ func (w *RiverJobs) makeRiverJobArgs(job jobs.Job) river.InsertManyParams {
166173
}
167174
}
168175
return river.InsertManyParams{
169-
Args: newRiverJobArgsFrmoJob(job),
176+
Args: newRiverJobArgsFromJob(job),
170177
InsertOpts: &insertOpts,
171178
}
172179
}

0 commit comments

Comments
 (0)