Skip to content

Commit 611d6ba

Browse files
Split readDB from writeDB
Signed-off-by: Alexandros Filios <alexandros.filios@ibm.com>
1 parent 3f367bb commit 611d6ba

File tree

17 files changed

+193
-169
lines changed

17 files changed

+193
-169
lines changed

token/services/db/sql/common/identity.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,31 @@ type identityTables struct {
3737
}
3838

3939
type IdentityDB struct {
40-
db *sql.DB
41-
table identityTables
42-
ci common.Interpreter
40+
readDB *sql.DB
41+
writeDB *sql.DB
42+
table identityTables
43+
ci common.Interpreter
4344

4445
signerCacheLock sync.RWMutex
4546
signerInfoCache cache[bool]
4647
auditInfoCache cache[[]byte]
4748
}
4849

49-
func newIdentityDB(db *sql.DB, tables identityTables, singerInfoCache cache[bool], auditInfoCache cache[[]byte], ci common.Interpreter) *IdentityDB {
50+
func newIdentityDB(readDB, writeDB *sql.DB, tables identityTables, singerInfoCache cache[bool], auditInfoCache cache[[]byte], ci common.Interpreter) *IdentityDB {
5051
return &IdentityDB{
51-
db: db,
52+
readDB: readDB,
53+
writeDB: writeDB,
5254
table: tables,
5355
signerInfoCache: singerInfoCache,
5456
auditInfoCache: auditInfoCache,
5557
ci: ci,
5658
}
5759
}
5860

59-
func NewCachedIdentityDB(db *sql.DB, opts NewDBOpts, ci common.Interpreter) (driver.IdentityDB, error) {
61+
func NewCachedIdentityDB(readDB, writeDB *sql.DB, opts NewDBOpts, ci common.Interpreter) (driver.IdentityDB, error) {
6062
return NewIdentityDB(
61-
db,
63+
readDB,
64+
writeDB,
6265
opts.TablePrefix,
6366
opts.CreateSchema,
6467
secondcache.NewTyped[bool](1000),
@@ -67,14 +70,15 @@ func NewCachedIdentityDB(db *sql.DB, opts NewDBOpts, ci common.Interpreter) (dri
6770
)
6871
}
6972

70-
func NewIdentityDB(db *sql.DB, tablePrefix string, createSchema bool, signerInfoCache cache[bool], auditInfoCache cache[[]byte], ci common.Interpreter) (*IdentityDB, error) {
73+
func NewIdentityDB(readDB, writeDB *sql.DB, tablePrefix string, createSchema bool, signerInfoCache cache[bool], auditInfoCache cache[[]byte], ci common.Interpreter) (*IdentityDB, error) {
7174
tables, err := GetTableNames(tablePrefix)
7275
if err != nil {
7376
return nil, errors.Wrapf(err, "failed to get table names")
7477
}
7578

7679
identityDB := newIdentityDB(
77-
db,
80+
readDB,
81+
writeDB,
7882
identityTables{
7983
IdentityConfigurations: tables.IdentityConfigurations,
8084
IdentityInfo: tables.IdentityInfo,
@@ -85,7 +89,7 @@ func NewIdentityDB(db *sql.DB, tablePrefix string, createSchema bool, signerInfo
8589
ci,
8690
)
8791
if createSchema {
88-
if err = common.InitSchema(db, []string{identityDB.GetSchema()}...); err != nil {
92+
if err = common.InitSchema(writeDB, []string{identityDB.GetSchema()}...); err != nil {
8993
return nil, err
9094
}
9195
}
@@ -99,7 +103,7 @@ func (db *IdentityDB) AddConfiguration(wp driver.IdentityConfiguration) error {
99103
}
100104
logger.Debug(query, wp.ID, wp.Type, wp.URL, wp.Config, wp.Raw)
101105

102-
_, err = db.db.Exec(query, wp.ID, wp.Type, wp.URL, wp.Config, wp.Raw)
106+
_, err = db.writeDB.Exec(query, wp.ID, wp.Type, wp.URL, wp.Config, wp.Raw)
103107
return err
104108
}
105109

@@ -109,7 +113,7 @@ func (db *IdentityDB) IteratorConfigurations(configurationType string) (driver.I
109113
return nil, errors.Wrapf(err, "failed compiling query")
110114
}
111115
logger.Debug(query)
112-
rows, err := db.db.Query(query, configurationType)
116+
rows, err := db.readDB.Query(query, configurationType)
113117
if err != nil {
114118
return nil, err
115119
}
@@ -121,7 +125,7 @@ func (db *IdentityDB) ConfigurationExists(id, typ, url string) (bool, error) {
121125
if err != nil {
122126
return false, errors.Wrapf(err, "failed compiling query")
123127
}
124-
result, err := common.QueryUnique[string](db.db, query, id, typ, url)
128+
result, err := common.QueryUnique[string](db.readDB, query, id, typ, url)
125129
if err != nil {
126130
return false, errors.Wrapf(err, "failed getting configuration for [%s:%s:%s]", id, typ, url)
127131
}
@@ -138,7 +142,7 @@ func (db *IdentityDB) StoreIdentityData(id []byte, identityAudit []byte, tokenMe
138142
logger.Debug(query)
139143

140144
h := token.Identity(id).String()
141-
_, err = db.db.Exec(query, h, id, identityAudit, tokenMetadata, tokenMetadataAudit)
145+
_, err = db.writeDB.Exec(query, h, id, identityAudit, tokenMetadata, tokenMetadataAudit)
142146
if err != nil {
143147
// does the record already exists?
144148
auditInfo, err2 := db.GetAuditInfo(id)
@@ -166,7 +170,7 @@ func (db *IdentityDB) GetAuditInfo(id []byte) ([]byte, error) {
166170
return nil, errors.Wrapf(err, "failed compiling query")
167171
}
168172
logger.Debug(query)
169-
row := db.db.QueryRow(query, h)
173+
row := db.readDB.QueryRow(query, h)
170174
var info []byte
171175
err = row.Scan(&info)
172176
if err == nil {
@@ -188,7 +192,7 @@ func (db *IdentityDB) GetTokenInfo(id []byte) ([]byte, []byte, error) {
188192
return nil, nil, errors.Wrapf(err, "failed compiling query")
189193
}
190194
logger.Debug(query)
191-
row := db.db.QueryRow(query, h)
195+
row := db.readDB.QueryRow(query, h)
192196
var tokenMetadata []byte
193197
var tokenMetadataAuditInfo []byte
194198
err = row.Scan(&tokenMetadata, &tokenMetadataAuditInfo)
@@ -210,7 +214,7 @@ func (db *IdentityDB) StoreSignerInfo(id, info []byte) error {
210214
if logger.IsEnabledFor(zapcore.DebugLevel) {
211215
logger.Debugf("store signer info [%s]: [%s][%s]", query, h, hash.Hashable(info))
212216
}
213-
_, err = db.db.Exec(query, h, id, info)
217+
_, err = db.writeDB.Exec(query, h, id, info)
214218
if err != nil {
215219
if exists, err2 := db.SignerInfoExists(id); err2 == nil && exists {
216220
logger.Debugf("signer info [%s] exists, no error to return", h)
@@ -271,7 +275,7 @@ func (db *IdentityDB) GetExistingSignerInfo(ids ...driver2.Identity) ([]string,
271275
return nil, errors.Wrapf(err, "failed compiling query")
272276
}
273277
logger.Debug(query, condition.Params())
274-
rows, err := db.db.Query(query, condition.Params()...)
278+
rows, err := db.readDB.Query(query, condition.Params()...)
275279
if err != nil {
276280
return nil, errors.Wrapf(err, "error querying db")
277281
}
@@ -305,7 +309,7 @@ func (db *IdentityDB) GetSignerInfo(identity []byte) ([]byte, error) {
305309
return nil, errors.Wrapf(err, "failed compiling query")
306310
}
307311
logger.Debug(query)
308-
row := db.db.QueryRow(query, h)
312+
row := db.readDB.QueryRow(query, h)
309313
var info []byte
310314
err = row.Scan(&info)
311315
if err != nil {

token/services/db/sql/common/tokenlock.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package common
88

99
import (
1010
"database/sql"
11+
errors2 "errors"
1112
"fmt"
1213
"time"
1314

@@ -24,34 +25,37 @@ type tokenLockTables struct {
2425
}
2526

2627
type TokenLockDB struct {
27-
DB *sql.DB
28-
Table tokenLockTables
29-
Logger logging.Logger
28+
ReadDB *sql.DB
29+
WriteDB *sql.DB
30+
Table tokenLockTables
31+
Logger logging.Logger
3032
}
3133

32-
func newTokenLockDB(db *sql.DB, tables tokenLockTables) *TokenLockDB {
34+
func newTokenLockDB(readDB, writeDB *sql.DB, tables tokenLockTables) *TokenLockDB {
3335
return &TokenLockDB{
34-
DB: db,
35-
Table: tables,
36-
Logger: logger,
36+
ReadDB: readDB,
37+
WriteDB: writeDB,
38+
Table: tables,
39+
Logger: logger,
3740
}
3841
}
3942

40-
func NewTokenLockDB(db *sql.DB, opts NewDBOpts) (*TokenLockDB, error) {
43+
func NewTokenLockDB(readDB, writeDB *sql.DB, opts NewDBOpts) (*TokenLockDB, error) {
4144
tables, err := GetTableNames(opts.TablePrefix)
4245
if err != nil {
4346
return nil, errors.Wrapf(err, "failed to get table names")
4447
}
4548

4649
tokenLockDB := newTokenLockDB(
47-
db,
50+
readDB,
51+
writeDB,
4852
tokenLockTables{
4953
TokenLocks: tables.TokenLocks,
5054
Requests: tables.Requests,
5155
},
5256
)
5357
if opts.CreateSchema {
54-
if err = common.InitSchema(db, []string{tokenLockDB.GetSchema()}...); err != nil {
58+
if err = common.InitSchema(writeDB, []string{tokenLockDB.GetSchema()}...); err != nil {
5559
return nil, err
5660
}
5761
}
@@ -64,7 +68,7 @@ func (db *TokenLockDB) Lock(tokenID *token.ID, consumerTxID transaction.ID) erro
6468
return errors.Wrap(err, "failed compiling query")
6569
}
6670
logger.Debug(query, tokenID, consumerTxID)
67-
_, err = db.DB.Exec(query, consumerTxID, tokenID.TxId, tokenID.Index, time.Now().UTC())
71+
_, err = db.WriteDB.Exec(query, consumerTxID, tokenID.TxId, tokenID.Index, time.Now().UTC())
6872
return err
6973
}
7074

@@ -75,7 +79,7 @@ func (db *TokenLockDB) UnlockByTxID(consumerTxID transaction.ID) error {
7579
}
7680
logger.Debug(query, consumerTxID)
7781

78-
_, err = db.DB.Exec(query, consumerTxID)
82+
_, err = db.WriteDB.Exec(query, consumerTxID)
7983
return err
8084
}
8185

@@ -95,7 +99,10 @@ func (db *TokenLockDB) GetSchema() string {
9599

96100
func (db *TokenLockDB) Close() error {
97101
logger.Info("closing database")
98-
err := db.DB.Close()
102+
if db.ReadDB != db.WriteDB {
103+
return errors2.Join(db.ReadDB.Close(), db.WriteDB.Close())
104+
}
105+
err := db.ReadDB.Close()
99106
if err != nil {
100107
return errors.Wrap(err, "could not close DB")
101108
}

0 commit comments

Comments
 (0)