-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmutex.go
466 lines (434 loc) · 13.9 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
// Package dbmutex implements a DB-based mutex that can be used to synchronize work
// among multiple processes.
//
// Features
//
// - minimal dependencies
//
// - works with mysql and postgres
//
// - works with database failovers
//
// - mutex lock and unlock operations can timeout if desired
//
// - callers can be notified when mutexes are unlocked
//
// Usage
//
// See the examples.
package dbmutex
import (
"context"
"database/sql"
"errors"
"log"
"os"
"sync"
"time"
"github.com/dynata/go-dbmutex/dbmerr"
"github.com/dynata/go-dbmutex/driver"
"github.com/google/uuid"
)
const (
// DefaultMutexTableName is the default table name that will be used for storing mutexes unless overridden
// via WithMutexTableName.
DefaultMutexTableName = "dbmutex"
// DefaultMutexName is the default mutex name that will be used unless overridden via WithMutexName. A unique
// mutex name should be used for each locking scenario. It is advised to always use WithMutexName.
DefaultMutexName = "mutex"
// DefaultRefresh is the default time to wait between refreshing locked Mutexes. See WithRefresh.
DefaultRefresh = 1 * time.Second
// DefaultExpiration is the default time after which a mutex will expire if it has not been automatically
// refreshed.
DefaultExpiration = 5 * time.Minute
// DefaultPollInterval is the default time that will be used to poll a locked mutex when attempting to
// lock a mutex. Override it via WithPollInterval
DefaultPollInterval = 1 * time.Second
)
// A Mutex is used to provide mutual exclusion among multiple processes that have access to a shared
// database. In order to provide mutual exclusion, the same underlying database table and mutex name
// (which ties to a row) should be used. (See WithMutexTableName and WithMutexName.) Multiple mutexes can
// be stored in the same table but different names should be used for different application specific locking
// scenarios.
//
// Behaviour can be be customized via the MutexOption parameters passed to New and the LockOption parameters
// passed to Lock.
type Mutex struct {
options *mutexOptions
db *sql.DB
lock *sync.Mutex
quitRefresh chan struct{}
doneRefreshing <-chan struct{}
hostname string
pid int
lockerId string
}
// An Identity uniquely identifies a Mutex. Note that two different Mutex Identities can still be used for
// mutual exclusion because only TableName, MutexName and DB server are used when determining exclusion.
// The other data elements like Hostname, Pid and LockerId are additional information.
type Identity struct {
TableName string
MutexName string
Hostname string
Pid int
LockerId string
}
// LogError is an ErrorNotifier that simply logs errors using the standard logger.
func LogError(e error) error {
log.Print(e.Error())
return nil
}
// IgnoreLogError is an ErrorNotifier that does nothing with passed errors.
func IgnoreLogError(error) error {
return nil
}
// An ErrorNotifier is called to notify when an error occurs while locking, unlocking and refreshing Mutexes.
// The function should typically return null so that the retries will occur. However, if the function returns
// non-nil, then the calling code will exit any retry loop. Normally, an ErrorNotifier can be used to
// simply log the fact that a transient error occurred and then return nil. See WithErrorNotifier.
type ErrorNotifier func(error) error
// The Lock operation attempts to acquire the Mutex by updating the "locked" column in the underlying database
// table. If unable to update the row, Lock will poll attempting to acquire the mutex by updating the row. Any database
// related errors that occur during Lock are reported via an ErrorNotifier (see WithErrorNotifier)
// but are typically ignored in order to complete the Lock operation. Lock can be timed out by using
// a ctx parameter that has a deadline (see examples). The poll interval and ability to fail fast after a
// database error can be controlled via MutexOption passed to New. If the mutex is acquired, a nil error and a context
// that expires when the Mutex is unlocked are returned. Because polling is used even after database generated
// errors, Lock will typically not return unless the Mutex is acquired or the ctx expires.
func (m *Mutex) Lock(ctx context.Context) (context.Context, error) {
if m == nil || m.db == nil {
return nil, dbmerr.NewMutexUninitializedError()
}
// pollChan will be used to repeatedly try to acquire the lock. It is only initialized after we have failed
// once to acquire the lock. Select immediately returns for closed channels so we use a closed channel during
// the first loop iteration.
pollChan := getClosedTimeChan()
var ticker *time.Ticker
Loop:
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-pollChan:
lockAcquired, err := m.options.driver.Lock(
ctx,
m.db,
m.options.tableName,
m.options.mutexName,
m.hostname,
m.pid,
m.lockerId,
m.options.refresh,
m.options.expiration,
)
if lockAcquired {
break Loop
}
if m.options.failFast {
return nil, dbmerr.NewLockFailFastError(m.options.tableName, m.options.mutexName, err)
}
if err != nil {
err2 := dbmerr.NewLockError(m.options.tableName, m.options.mutexName, err)
if m.options.lockErrNotifier(err2) != nil {
return nil, err2
}
}
if ticker == nil {
ticker = time.NewTicker(m.options.pollInterval)
// This defer inside a for loop is ok because we will only defer once.
defer ticker.Stop()
pollChan = ticker.C
}
}
}
// If we are here, then we have acquired db level lock.
// Acquire our own sync.Mutex so we can update internal state.
m.lock.Lock()
defer m.lock.Unlock()
if m.quitRefresh != nil {
// This should not happen unless the Driver has some issues.
return nil, dbmerr.NewDriverAllowedConcurrentLocksError(m.options.tableName, m.options.mutexName)
}
m.quitRefresh = make(chan struct{})
lockCtx, lockCanceledFunc := context.WithCancel(context.Background())
m.doneRefreshing = goRefreshLock(
m.options.driver,
m.db,
m.options.tableName,
m.options.mutexName,
m.hostname,
m.pid,
m.lockerId,
m.options.refresh,
m.options.expiration,
m.quitRefresh,
time.Now,
m.options.refreshErrNotifier,
lockCanceledFunc,
)
return lockCtx, nil
}
// The Unlock operation attempts to release the Mutex by updating the "locked" column in the underlying database
// table. If unable to update the row, Lock will poll attempting to release the mutex by updating the row. Any database
// related errors that occur during Unlock are reported via an ErrorNotifier (see WithErrorNotifier)
// but are typically ignored in order to complete the Unlock operation. Unlock can be timed out by using
// a ctx parameter that has a deadline (see examples). The poll interval and ability to fail fast after a
// database error can be controlled via MutexOption passed to New. Because polling is used even after database generated
// errors, Unlock will typically not return unless the Mutex is released or the ctx expires.
func (m *Mutex) Unlock(ctx context.Context) error {
if m == nil || m.db == nil {
return dbmerr.NewMutexUninitializedError()
}
m.lock.Lock()
defer m.lock.Unlock()
if m.quitRefresh == nil {
return dbmerr.NewNotLockedError(m.options.tableName, m.options.mutexName)
}
close(m.quitRefresh)
<-m.doneRefreshing
m.quitRefresh = nil
m.doneRefreshing = nil
// pollChan will be used to repeatedly try to release the lock. It is only initialized after we have failed
// once to release the lock. Select immediately returns for closed channels so we use a closed channel during
// the first loop iteration.
pollChan := getClosedTimeChan()
var ticker *time.Ticker
Loop:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollChan:
_, err := m.options.driver.Unlock(
ctx,
m.db,
m.options.tableName,
m.options.mutexName,
m.hostname,
m.pid,
m.lockerId,
)
if err == nil {
// If there was no error then we assume lock was either unlocked or we were no longer
// the owner. In both cases, we consider the lock to be unlocked.
break Loop
}
if m.options.failFast {
return dbmerr.NewUnlockFailFastError(m.options.tableName, m.options.mutexName, err)
}
if err != nil {
err2 := dbmerr.NewUnlockError(m.options.tableName, m.options.mutexName, err)
if m.options.unlockErrNotifier(err2) != nil {
return err2
}
}
if ticker == nil {
ticker = time.NewTicker(m.options.pollInterval)
// This defer inside a for loop is ok because we will only defer once.
defer ticker.Stop()
pollChan = ticker.C
}
}
}
return nil
}
// Identity returns the Identity of this Mutex.
func (m *Mutex) Identity() Identity {
if m == nil || m.db == nil {
return Identity{}
}
return Identity{
TableName: m.options.tableName,
MutexName: m.options.mutexName,
Hostname: m.hostname,
Pid: m.pid,
LockerId: m.lockerId,
}
}
func initMutexOptions(options ...MutexOption) *mutexOptions {
mo := &mutexOptions{
tableName: DefaultMutexTableName,
mutexName: DefaultMutexName,
refresh: DefaultRefresh,
expiration: DefaultExpiration,
createMissingTable: true,
lockErrNotifier: IgnoreLogError,
unlockErrNotifier: IgnoreLogError,
refreshErrNotifier: IgnoreLogError,
}
for _, o := range options {
o(mo)
}
if mo.tableName == "" {
mo.tableName = DefaultMutexTableName
}
if mo.mutexName == "" {
mo.mutexName = DefaultMutexName
}
if mo.refresh <= 0 {
mo.refresh = DefaultRefresh
}
if mo.expiration <= 0 {
mo.expiration = DefaultExpiration
}
if mo.lockErrNotifier == nil {
mo.lockErrNotifier = IgnoreLogError
}
if mo.unlockErrNotifier == nil {
mo.unlockErrNotifier = IgnoreLogError
}
if mo.refreshErrNotifier == nil {
mo.refreshErrNotifier = IgnoreLogError
}
if mo.pollInterval <= 0 {
mo.pollInterval = DefaultPollInterval
}
return mo
}
func createMutexTableIfNotExists(
ctx context.Context,
db *sql.DB,
driver driver.Driver,
tableName string,
) error {
err := driver.CreateMutexTableIfNotExists(ctx, db, tableName)
if err != nil {
// It is possible that create table if not exists operation will fail with a message like
// pq: duplicate key value violates unique constraint "pg_type_typname_nsp_index". This race condition
// should only occur if multiple callers attempt to create the same table at the same time. A simple
// retry should eliminate the error.
err2 := driver.CreateMutexTableIfNotExists(ctx, db, tableName)
if err2 != nil {
return err
}
}
return err
}
// New creates a new Mutex. The passed db is used for all database interactions. Behaviour can be customized
// by passing options. The option that should almost always be passed is the lock name (See WithMutexName.)
func New(
ctx context.Context,
db *sql.DB,
options ...MutexOption,
) (*Mutex, error) {
mo := initMutexOptions(options...)
var err error
if mo.driver == nil {
mo.driver, err = driver.ResolveDriver(ctx, db)
if err != nil {
return nil, err
}
}
if mo.createMissingTable {
err = createMutexTableIfNotExists(ctx, db, mo.driver, mo.tableName)
if err != nil {
return nil, err
}
}
if !mo.delayAddMutexRow {
err = mo.driver.CreateMutexEntryIfNotExists(ctx, db, mo.tableName, mo.mutexName)
if err != nil {
// mysql can fail on concurrent inserts :( so try one more time.
// Error 1213: Deadlock found when trying to get lock; try restarting transaction
err2 := mo.driver.CreateMutexEntryIfNotExists(ctx, db, mo.tableName, mo.mutexName)
if err2 != nil {
return nil, err
}
}
}
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
if len(hostname) > driver.MaxHostnameLength {
hostname = hostname[:driver.MaxHostnameLength]
}
pid := os.Getpid()
return &Mutex{
options: mo,
db: db,
lock: &sync.Mutex{},
hostname: hostname,
pid: pid,
lockerId: uuid.New().String(),
}, nil
}
// goRefreshLock keeps the database row corresponding to the passed parameters up-to-date by periodically
// updating the row in order to indicate that the mutex is still held.
func goRefreshLock(
driver driver.Driver,
ex driver.Execer,
tableName string,
mutexName string,
hostname string,
pid int,
lockerId string,
refresh time.Duration,
expiration time.Duration,
quit <-chan struct{},
now func() time.Time,
errorNotifier ErrorNotifier,
lockCanceled context.CancelFunc,
) <-chan struct{} {
doneRefreshing := make(chan struct{})
go func() {
ticker := time.NewTicker(refresh)
defer func() {
ticker.Stop()
lockCanceled()
close(doneRefreshing)
}()
expiresAt := now().Add(expiration)
bgContext := context.Background()
for {
select {
case <-ticker.C:
lockRefreshed, err := driver.Refresh(
bgContext,
ex,
tableName,
mutexName,
hostname,
pid,
lockerId,
refresh,
expiration,
)
if err != nil {
if now().After(expiresAt) {
// we haven't had a refresh and should now be expired. bail out.
_ = errorNotifier(dbmerr.NewRefreshLockError(tableName, mutexName, false, err))
return
}
// report the error, but don't bail from loop unless calling error notifier forces bail out.
// we'll try again later.
if errorNotifier(dbmerr.NewRefreshLockError(tableName, mutexName, true, err)) != nil {
return
}
} else if !lockRefreshed {
// we are no longer owner of lock. wtf.
_ = errorNotifier(dbmerr.NewRefreshLockError(tableName, mutexName, false,
errors.New("no longer owner of lock")))
return
} else {
// good refresh. reset expiration time.
expiresAt = now().Add(expiration)
}
case <-quit:
return
}
}
}()
return doneRefreshing
}
var (
closedTimeChan chan time.Time
closedTimeChanOnce sync.Once
)
func getClosedTimeChan() <-chan time.Time {
closedTimeChanOnce.Do(func() {
closedTimeChan = make(chan time.Time)
close(closedTimeChan)
})
return closedTimeChan
}