Skip to content

Commit 73cc499

Browse files
authored
Add an option to distribute reads among all nodes (#3)
1 parent 5bac59b commit 73cc499

File tree

8 files changed

+153
-33
lines changed

8 files changed

+153
-33
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
.idea
12
# Compiled Object files, Static and Dynamic libs (Shared Objects)
23
*.o
34
*.a

Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.14.3
2+
3+
WORKDIR /src
4+
5+
COPY go.mod .
6+
COPY go.sum .
7+
RUN go mod download
8+
9+
ADD . .
10+
11+
CMD go test -count=1 -v -race ./...

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.PHONY: test
2+
test:
3+
docker-compose -f docker-compose.yml up --build --abort-on-container-exit
4+
docker-compose -f docker-compose.yml down --volumes

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,15 @@ db, _ := mssqlx.ConnectMasterSlaves("mysql", masterDSNs, slaveDSNs)
5252
Recommended to set flag as following:
5353

5454
```go
55-
db, _ := mssqlx.ConnectMasterSlaves("mysql", masterDSNs, slaveDSNs, true)
55+
db, _ := mssqlx.ConnectMasterSlaves("mysql", masterDSNs, slaveDSNs, mssqlx.WithWsrep())
56+
```
57+
58+
## Connecting to Databases with custom read-query source
59+
60+
Read-queries will be distributed among both masters and slaves:
61+
62+
```go
63+
db, _ := mssqlx.ConnectMasterSlaves("mysql", masterDSNs, slaveDSNs, mssqlx.WithReadQuerySource(mssqlx.ReadQuerySourceAll))
5664
```
5765

5866
## Configuration

docker-compose.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
version: '3'
2+
networks:
3+
integration-test:
4+
driver: bridge
5+
services:
6+
postgres:
7+
image: postgres:9.6
8+
expose:
9+
- "5432"
10+
environment:
11+
POSTGRES_USER: test
12+
POSTGRES_PASSWORD: test
13+
POSTGRES_DB: test
14+
restart: on-failure
15+
networks:
16+
- integration-test
17+
mysql:
18+
image: mysql:5.7
19+
expose:
20+
- "3306"
21+
environment:
22+
MYSQL_USER: test
23+
MYSQL_PASSWORD: test
24+
MYSQL_ROOT_PASSWORD: test
25+
MYSQL_DATABASE: test
26+
restart: on-failure
27+
networks:
28+
- integration-test
29+
mssqlx:
30+
build:
31+
context: .
32+
dockerfile: Dockerfile
33+
environment:
34+
MSSQLX_POSTGRES_DSN: host=postgres user=test password=test dbname=test sslmode=disable
35+
MSSQLX_MYSQL_DSN: test:test@tcp(mysql:3306)/test
36+
depends_on:
37+
- postgres
38+
- mysql
39+
networks:
40+
- integration-test

mssqlx.go

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ func ping(w *wrapper) (err error) {
4040

4141
// DBs sqlx wrapper supports querying master-slave database connections for HA and scalability, auto-balancer integrated.
4242
type DBs struct {
43-
driverName string
43+
driverName string
44+
readQuerySource ReadQuerySource
4445

4546
masters *balancer
4647
slaves *balancer
@@ -65,6 +66,13 @@ func (dbs *DBs) getDBs(s []*wrapper) ([]*sqlx.DB, int) {
6566
return r, n
6667
}
6768

69+
func (dbs *DBs) readBalancer() *balancer {
70+
if dbs.readQuerySource == ReadQuerySourceAll {
71+
return dbs.all
72+
}
73+
return dbs.slaves
74+
}
75+
6876
// GetAllMasters get all master database connections, included failing one.
6977
func (dbs *DBs) GetAllMasters() ([]*sqlx.DB, int) {
7078
return dbs.getDBs(dbs._masters)
@@ -590,7 +598,7 @@ func _namedQuery(ctx context.Context, target *balancer, query string, arg interf
590598
// NamedQuery do named query.
591599
// Any named placeholder parameters are replaced with fields from arg.
592600
func (dbs *DBs) NamedQuery(query string, arg interface{}) (*sqlx.Rows, error) {
593-
return _namedQuery(context.Background(), dbs.slaves, query, arg)
601+
return _namedQuery(context.Background(), dbs.readBalancer(), query, arg)
594602
}
595603

596604
// NamedQueryOnMaster do named query on master.
@@ -602,7 +610,7 @@ func (dbs *DBs) NamedQueryOnMaster(query string, arg interface{}) (*sqlx.Rows, e
602610
// NamedQueryContext do named query with context.
603611
// Any named placeholder parameters are replaced with fields from arg.
604612
func (dbs *DBs) NamedQueryContext(ctx context.Context, query string, arg interface{}) (*sqlx.Rows, error) {
605-
return _namedQuery(ctx, dbs.slaves, query, arg)
613+
return _namedQuery(ctx, dbs.readBalancer(), query, arg)
606614
}
607615

608616
// NamedQueryContextOnMaster do named query with context on master.
@@ -699,7 +707,7 @@ func _query(ctx context.Context, target *balancer, query string, args ...interfa
699707
// Query executes a query on slaves that returns rows, typically a SELECT.
700708
// The args are for any placeholder parameters in the query.
701709
func (dbs *DBs) Query(query string, args ...interface{}) (r *sql.Rows, err error) {
702-
_, r, err = _query(context.Background(), dbs.slaves, query, args...)
710+
_, r, err = _query(context.Background(), dbs.readBalancer(), query, args...)
703711
return
704712
}
705713

@@ -713,7 +721,7 @@ func (dbs *DBs) QueryOnMaster(query string, args ...interface{}) (r *sql.Rows, e
713721
// QueryContext executes a query on slaves that returns rows, typically a SELECT.
714722
// The args are for any placeholder parameters in the query.
715723
func (dbs *DBs) QueryContext(ctx context.Context, query string, args ...interface{}) (r *sql.Rows, err error) {
716-
_, r, err = _query(ctx, dbs.slaves, query, args...)
724+
_, r, err = _query(ctx, dbs.readBalancer(), query, args...)
717725
return
718726
}
719727

@@ -758,7 +766,7 @@ func _queryx(ctx context.Context, target *balancer, query string, args ...interf
758766
// Queryx executes a query on slaves that returns rows, typically a SELECT.
759767
// The args are for any placeholder parameters in the query.
760768
func (dbs *DBs) Queryx(query string, args ...interface{}) (r *sqlx.Rows, err error) {
761-
_, r, err = _queryx(context.Background(), dbs.slaves, query, args...)
769+
_, r, err = _queryx(context.Background(), dbs.readBalancer(), query, args...)
762770
return
763771
}
764772

@@ -772,7 +780,7 @@ func (dbs *DBs) QueryxOnMaster(query string, args ...interface{}) (r *sqlx.Rows,
772780
// QueryxContext executes a query on slaves that returns rows, typically a SELECT.
773781
// The args are for any placeholder parameters in the query.
774782
func (dbs *DBs) QueryxContext(ctx context.Context, query string, args ...interface{}) (r *sqlx.Rows, err error) {
775-
_, r, err = _queryx(ctx, dbs.slaves, query, args...)
783+
_, r, err = _queryx(ctx, dbs.readBalancer(), query, args...)
776784
return
777785
}
778786

@@ -801,7 +809,7 @@ func _queryRow(ctx context.Context, target *balancer, query string, args ...inte
801809
// QueryRow always returns a non-nil value. Errors are deferred until
802810
// Row's Scan method is called.
803811
func (dbs *DBs) QueryRow(query string, args ...interface{}) (r *sql.Row, err error) {
804-
_, r, err = _queryRow(context.Background(), dbs.slaves, query, args...)
812+
_, r, err = _queryRow(context.Background(), dbs.readBalancer(), query, args...)
805813
return
806814
}
807815

@@ -817,7 +825,7 @@ func (dbs *DBs) QueryRowOnMaster(query string, args ...interface{}) (r *sql.Row,
817825
// QueryRow always returns a non-nil value. Errors are deferred until
818826
// Row's Scan method is called.
819827
func (dbs *DBs) QueryRowContext(ctx context.Context, query string, args ...interface{}) (r *sql.Row, err error) {
820-
_, r, err = _queryRow(ctx, dbs.slaves, query, args...)
828+
_, r, err = _queryRow(ctx, dbs.readBalancer(), query, args...)
821829
return
822830
}
823831

@@ -848,7 +856,7 @@ func _queryRowx(ctx context.Context, target *balancer, query string, args ...int
848856
// QueryRow always returns a non-nil value. Errors are deferred until
849857
// Row's Scan method is called.
850858
func (dbs *DBs) QueryRowx(query string, args ...interface{}) (r *sqlx.Row, err error) {
851-
_, r, err = _queryRowx(context.Background(), dbs.slaves, query, args...)
859+
_, r, err = _queryRowx(context.Background(), dbs.readBalancer(), query, args...)
852860
return
853861
}
854862

@@ -864,7 +872,7 @@ func (dbs *DBs) QueryRowxOnMaster(query string, args ...interface{}) (r *sqlx.Ro
864872
// QueryRow always returns a non-nil value. Errors are deferred until
865873
// Row's Scan method is called.
866874
func (dbs *DBs) QueryRowxContext(ctx context.Context, query string, args ...interface{}) (r *sqlx.Row, err error) {
867-
_, r, err = _queryRowx(ctx, dbs.slaves, query, args...)
875+
_, r, err = _queryRowx(ctx, dbs.readBalancer(), query, args...)
868876
return
869877
}
870878

@@ -904,7 +912,7 @@ func _select(ctx context.Context, target *balancer, dest interface{}, query stri
904912
// Select do select on slaves.
905913
// Any placeholder parameters are replaced with supplied args.
906914
func (dbs *DBs) Select(dest interface{}, query string, args ...interface{}) (err error) {
907-
_, err = _select(context.Background(), dbs.slaves, dest, query, args...)
915+
_, err = _select(context.Background(), dbs.readBalancer(), dest, query, args...)
908916
return
909917
}
910918

@@ -918,7 +926,7 @@ func (dbs *DBs) SelectOnMaster(dest interface{}, query string, args ...interface
918926
// SelectContext do select on slaves with context.
919927
// Any placeholder parameters are replaced with supplied args.
920928
func (dbs *DBs) SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) {
921-
_, err = _select(ctx, dbs.slaves, dest, query, args...)
929+
_, err = _select(ctx, dbs.readBalancer(), dest, query, args...)
922930
return
923931
}
924932

@@ -974,7 +982,7 @@ func (dbs *DBs) GetOnMaster(dest interface{}, query string, args ...interface{})
974982
// Any placeholder parameters are replaced with supplied args.
975983
// An error is returned if the result set is empty.
976984
func (dbs *DBs) GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) {
977-
_, err = _get(ctx, dbs.slaves, dest, query, args...)
985+
_, err = _get(ctx, dbs.readBalancer(), dest, query, args...)
978986
return
979987
}
980988

@@ -1440,9 +1448,8 @@ func (dbs *DBs) BeginTxx(ctx context.Context, opts *sql.TxOptions) (res *sqlx.Tx
14401448
// ConnectMasterSlaves to master-slave databases, healthchecks will ensure they are working
14411449
// driverName: mysql, postgres, etc.
14421450
// masterDSNs: data source names of Masters.
1443-
// slaveDSNs: data source names of Slaves.
1444-
// args: args[0] = true to indicates galera/wsrep cluster.
1445-
func ConnectMasterSlaves(driverName string, masterDSNs []string, slaveDSNs []string, args ...interface{}) (*DBs, []error) {
1451+
// slaveDSNs: data source names of ReadQuerySourceSlaves.
1452+
func ConnectMasterSlaves(driverName string, masterDSNs []string, slaveDSNs []string, options ...Option) (*DBs, []error) {
14461453
// Validate slave address
14471454
if slaveDSNs == nil {
14481455
slaveDSNs = []string{}
@@ -1452,12 +1459,14 @@ func ConnectMasterSlaves(driverName string, masterDSNs []string, slaveDSNs []str
14521459
masterDSNs = []string{}
14531460
}
14541461

1455-
isWsrep := false
1456-
if len(args) > 0 {
1457-
switch args[0].(type) {
1458-
case bool:
1459-
isWsrep = args[0].(bool)
1460-
}
1462+
// default cluster options
1463+
opts := &clusterOptions{
1464+
isWsrep: false,
1465+
readQuerySource: ReadQuerySourceSlaves,
1466+
}
1467+
1468+
for _, optFn := range options {
1469+
optFn(opts)
14611470
}
14621471

14631472
nMaster := len(masterDSNs)
@@ -1466,15 +1475,16 @@ func ConnectMasterSlaves(driverName string, masterDSNs []string, slaveDSNs []str
14661475

14671476
errResult := make([]error, nAll)
14681477
dbs := &DBs{
1469-
driverName: driverName,
1478+
driverName: driverName,
1479+
readQuerySource: opts.readQuerySource,
14701480

1471-
masters: newBalancer(nil, nMaster>>2, nMaster, isWsrep),
1481+
masters: newBalancer(nil, nMaster>>2, nMaster, opts.isWsrep),
14721482
_masters: make([]*wrapper, nMaster),
14731483

1474-
slaves: newBalancer(nil, nSlave>>2, nSlave, isWsrep),
1484+
slaves: newBalancer(nil, nSlave>>2, nSlave, opts.isWsrep),
14751485
_slaves: make([]*wrapper, nSlave),
14761486

1477-
all: newBalancer(nil, nAll>>2, nAll, isWsrep),
1487+
all: newBalancer(nil, nAll>>2, nAll, opts.isWsrep),
14781488
_all: make([]*wrapper, nAll),
14791489
}
14801490

mssqlx_test.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func TestDbBalancer(t *testing.T) {
350350
}
351351

352352
func TestConnectMasterSlave(t *testing.T) {
353-
dsn, driver := "user=test1 dbname=test1 sslmode=disable", "postgres"
353+
dsn, driver := os.Getenv("MSSQLX_POSTGRES_DSN"), "postgres"
354354

355355
masterDSNs := []string{dsn, dsn, dsn}
356356
slaveDSNs := []string{dsn, dsn}
@@ -380,7 +380,7 @@ func TestConnectMasterSlave(t *testing.T) {
380380

381381
// test another ping
382382
for _, v := range db._all {
383-
if e := ping(v); e != nil && e.Error() != "pq: role \"test1\" does not exist" {
383+
if e := ping(v); e != nil && e.Error() != "pq: role \"test\" does not exist" {
384384
t.Fatal(e)
385385
}
386386
}
@@ -441,7 +441,7 @@ func TestConnectMasterSlave(t *testing.T) {
441441
t.Fatal("DestroySlave fail")
442442
}
443443

444-
db, _ = ConnectMasterSlaves(driver, masterDSNs, slaveDSNs, true)
444+
db, _ = ConnectMasterSlaves(driver, masterDSNs, slaveDSNs, WithWsrep())
445445
if _, c := db.GetAllMasters(); c != 3 {
446446
t.Fatal("Initialize master slave fail")
447447
}
@@ -454,15 +454,26 @@ func TestConnectMasterSlave(t *testing.T) {
454454
t.Fatal("Destroy fail")
455455
}
456456

457-
db, _ = ConnectMasterSlaves(driver, nil, slaveDSNs, true)
457+
db, _ = ConnectMasterSlaves(driver, nil, slaveDSNs, WithWsrep())
458458
if _, c := db.GetAllMasters(); c != 0 {
459459
t.Fatal("Initialize master slave fail")
460460
}
461461

462-
db, _ = ConnectMasterSlaves(driver, nil, nil, true)
462+
db, _ = ConnectMasterSlaves(driver, nil, nil, WithWsrep())
463463
if _, c := db.GetAllSlaves(); c != 0 {
464464
t.Fatal("Initialize master slave fail")
465465
}
466+
467+
// check read-query source
468+
db, _ = ConnectMasterSlaves(driver, nil, nil)
469+
if db.readBalancer() != db.slaves {
470+
t.Fatal("Initialize master slave fail")
471+
}
472+
473+
db, _ = ConnectMasterSlaves(driver, nil, nil, WithReadQuerySource(ReadQuerySourceAll))
474+
if db.readBalancer() != db.all {
475+
t.Fatal("Initialize master slave fail")
476+
}
466477
}
467478

468479
func TestGlobalFunc(t *testing.T) {

options.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package mssqlx
2+
3+
type ReadQuerySource int
4+
5+
const (
6+
// ReadQuerySourceSlaves indicates: read-queries will be distributed only among slaves.
7+
// This is default value.
8+
//
9+
// Note: there is no option for Master. One could use functions like `QueryMaster`, etc
10+
// to query from masters only.
11+
ReadQuerySourceSlaves ReadQuerySource = iota
12+
// ReadQuerySourceAll indicates: read-queries will be distributed among both masters and slaves.
13+
ReadQuerySourceAll
14+
)
15+
16+
type clusterOptions struct {
17+
isWsrep bool
18+
readQuerySource ReadQuerySource
19+
}
20+
21+
type Option func(*clusterOptions)
22+
23+
// WithWsrep indicates galera/wsrep cluster
24+
func WithWsrep() Option {
25+
return func(o *clusterOptions) {
26+
o.isWsrep = true
27+
}
28+
}
29+
30+
// WithReadQuerySource sets default sources for read-queries.
31+
func WithReadQuerySource(source ReadQuerySource) Option {
32+
return func(o *clusterOptions) {
33+
o.readQuerySource = source
34+
}
35+
}

0 commit comments

Comments
 (0)