-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdotpgx.go
More file actions
176 lines (159 loc) · 4.66 KB
/
dotpgx.go
File metadata and controls
176 lines (159 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Package dotpgx creates a connection pool, parses and executes queries.
package dotpgx
import (
"errors"
"fmt"
"strings"
"github.com/jackc/pgx"
"github.com/jackc/pgx/log/log15adapter"
log "gopkg.in/inconshreveable/log15.v2"
)
// DB represents the database connection pool and parsed queries.
type DB struct {
// Pool allows direct access to the underlying *pgx.ConnPool
Pool *pgx.ConnPool
qm queryMap
qn int // Incremented value for unamed queries
}
/*
New configures and creates a database connection pool
It returns a pointer to the Database object.
It returns an error only when the connection pool cannot be set-up.
An example config object would look like:
conf := pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: pgHost,
User: pgUser,
Database: pgDatabase,
Logger: logger,
},
MaxConnections: 50,
AfterConnect: sqlPrepare,
}
Most arguments are optional. If no logger is specified,
one will get apointed automatically.
*/
func New(conf pgx.ConnPoolConfig) (db *DB, err error) {
if conf.Logger == nil {
conf.Logger = log15adapter.NewLogger(log.New("module", "pgx"))
}
pool, err := pgx.NewConnPool(conf)
if err != nil {
log.Crit("Unable to create connection pool", "error", err)
return
}
db = &DB{
Pool: pool,
qm: make(queryMap),
}
return
}
// HasQueries returns true if the are queries in the map.
// False in case of nil map or 0 queries.
func (db *DB) HasQueries() bool {
return db.qm != nil && len(db.qm) > 0
}
// List of all registered query names, sorted
func (db *DB) List() (index []string) {
return db.qm.sort()
}
// Prepare a sql statement identified by name.
func (db *DB) Prepare(name string) (*pgx.PreparedStatement, error) {
q, err := db.qm.getQuery(name)
if err != nil {
return nil, err
}
q.ps, err = db.Pool.Prepare(name, q.sql)
if err != nil {
return nil, err
}
return q.ps, nil
}
// PrepareAll prepares all registered queries. It returns an error
// when one of the queries failed to prepare. However, it will not
// abort in such case and attempts to prepare the remaining statements.
func (db *DB) PrepareAll() (ps []*pgx.PreparedStatement, err error) {
msg := []string{}
for name, query := range db.qm {
p, e := db.Prepare(name)
if e != nil {
m := []string{
"Error in preparing statement:",
name,
"; With query:",
query.sql,
}
msg = append(msg, strings.Join(m, " "))
} else {
ps = append(ps, p)
}
}
if len(msg) > 0 {
err = errors.New(strings.Join(msg, "\n"))
}
return
}
// Query runs the sql indentified by name. Return a row set.
func (db *DB) Query(name string, args ...interface{}) (*pgx.Rows, error) {
q, err := db.qm.getQuery(name)
if err != nil {
return nil, err
}
return db.Pool.Query(q.getSQL(), args...)
}
// QueryRow runs the sql identified by name. It returns a single row.
// Not that an error is only returned if the query is not defined.
// A query error is defered untill row.Scan is run. See pgx docs for more info.
func (db *DB) QueryRow(name string, args ...interface{}) (*pgx.Row, error) {
q, err := db.qm.getQuery(name)
if err != nil {
return nil, err
}
return db.Pool.QueryRow(q.getSQL(), args...), nil
}
// Exec runs the sql identified by name. Returns the result of the exec or an error.
func (db *DB) Exec(name string, args ...interface{}) (pgx.CommandTag, error) {
q, err := db.qm.getQuery(name)
if err != nil {
return "", err
}
return db.Pool.Exec(q.getSQL(), args...)
}
// DropQuery removes a query form the Map.
// It calls Pgx Deallocate if the query was a prepared statement.
// An error is returned only when deallocating fails.
// Regardless of an error, the query will be dropped from the map.
func (db *DB) DropQuery(name string) (err error) {
if db.qm[name].isPrepared() {
err = db.Pool.Deallocate(name)
}
mutex.Lock()
delete(db.qm, name)
mutex.Unlock()
return
}
// ClearMap clears the query map and sets the internal incremental counter to 0.
// Use this before you want to load a fresh set of queries, keeping the connection pool open.
// An error is only returned if one or more prepared statements failed to deallocate.
// It does not abbort on error and continues to (attempt) the clear the remaining queries.
func (db *DB) ClearMap() (err error) {
var msg []string
for name := range db.qm {
err := db.DropQuery(name)
if err != nil {
msg = append(msg, fmt.Sprint(err))
}
}
db.qn = 0
if len(msg) > 0 {
err = errors.New(strings.Join(msg, "\n"))
}
return
}
// Close cleans up the mapped queries and closes the pgx connection pool.
// It is safe to call close multiple times.
func (db *DB) Close() {
// Possible Deaollocate errors ignored, we are going to close the connnection anyway.
db.ClearMap()
db.Pool.Close()
}