-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmap.go
275 lines (250 loc) · 8.9 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package dbmutex
import (
"context"
"database/sql"
"sync"
"sync/atomic"
"github.com/dynata/go-dbmutex/dbmerr"
"github.com/dynata/go-dbmutex/driver"
)
// This interface is mainly used to simply testing.
type mutexOperations interface {
Lock(ctx context.Context) (context.Context, error)
Unlock(ctx context.Context) error
}
type mutexMapOptions struct {
maxLocalWaiters int32
mutexOptions []MutexOption
allocator mutexAllocator
delayCreateMissingTable bool
delayResolveDriver bool
}
// MutexMapOption is used to customize MutexMap behaviour.
type MutexMapOption func(options *mutexMapOptions)
// WithMaxLocalWaiters sets the maximum number of local waiters. "Local" emphasises that the waiters
// are local to this MutexMap. Waiters are not counted across different MutexMaps and/or processes.
// Pass -1 (which is the default) to indicate that an unlimited number of local waiters are allowed.
func WithMaxLocalWaiters(max int32) MutexMapOption {
return func(o *mutexMapOptions) {
o.maxLocalWaiters = max
}
}
// WithMutexOptions can be used to customize the Mutex objects that are used for locking. The options
// are passed to New when creating Mutex objects.
func WithMutexOptions(options ...MutexOption) MutexMapOption {
return func(o *mutexMapOptions) {
o.mutexOptions = options
}
}
// WithDelayCreateMissingTable allows customization of table creation if it does not exist. Pass true in order to
// override the default behaviour and delay the creation of a a missing mutex table.
func WithDelayCreateMissingTable(b bool) MutexMapOption {
return func(o *mutexMapOptions) {
o.delayCreateMissingTable = b
}
}
// WithDelayResolveDriver allows customization of driver.Driver resolution. Pass true in order to
// override the default behaviour and delay resolution of the driver.Driver.
func WithDelayResolveDriver(b bool) MutexMapOption {
return func(o *mutexMapOptions) {
o.delayResolveDriver = b
}
}
func withMutexAllocator(allocator mutexAllocator) MutexMapOption {
return func(o *mutexMapOptions) {
o.allocator = allocator
}
}
// mutexAllocator is used to decouple creation of underlying Mutex objects during testing.
type mutexAllocator func(ctx context.Context, db *sql.DB, options ...MutexOption) (mutexOperations, error)
func dbMutexAllocator(ctx context.Context, db *sql.DB, options ...MutexOption) (mutexOperations, error) {
return New(ctx, db, options...)
}
// MutexMap implements a map of named mutexes. It's primary purpose is to prevent additional database calls
// that are needed for locking Mutex objects. If multiple goroutines in the same process
// use the same MutexMap for locking, only one underlying Mutex will be used to interact with the database.
// Additional lockers will wait via in-process synchronization. If you don't care about the additional database
// resource consumption or have low volume locking needs, you might instead use Mutex directly. Internally,
// a map is used to hold reference counted Mutex objects. Once named Mutex objects are not referenced,
// they are removed from the map so the map size does not grow beyond the number of locked Mutexes.
type MutexMap struct {
options *mutexMapOptions
db *sql.DB
lock sync.Mutex
countedMutexes map[string]*countedMutex
}
// countedMutex objects are held in MutexMap. They keep hold counters and the underlying Mutex.
type countedMutex struct {
mutex mutexOperations
timeoutLock chan struct{}
waiters int32
references int32
}
// lock locks the underlying Mutex after first acquiring the private lock.
func (cm *countedMutex) lock(
ctx context.Context,
allocator func() (mutexOperations, error),
maxWaiters int32,
name string,
) (context.Context, error) {
// First, try non-blocking lock. We only become a waiter if we can't acquire lock in non-blocking fashion.
select {
case cm.timeoutLock <- struct{}{}:
// Lock acquired. Will unlock when unlock is called.
default:
// Unable to acquire local lock without waiting. We are now a waiter.
// Check limits before waiting in a blocking fashion.
currentWaiters := atomic.AddInt32(&cm.waiters, 1)
defer atomic.AddInt32(&cm.waiters, -1)
if maxWaiters >= 0 && currentWaiters > maxWaiters {
return nil, dbmerr.NewMaxWaitersExceededError(int(maxWaiters), name)
}
select {
case cm.timeoutLock <- struct{}{}:
// Lock acquired. Will unlock when unlock is called.
case <-ctx.Done():
// timeout
return nil, ctx.Err()
}
}
if cm.mutex == nil {
m, err := allocator()
if err != nil {
// unlock our local lock
<-cm.timeoutLock
return nil, err
}
cm.mutex = m
}
lockCtx, err := cm.mutex.Lock(ctx)
if err != nil {
// unlock our local lock
<-cm.timeoutLock
}
return lockCtx, err
}
func (cm *countedMutex) unlock(ctx context.Context) error {
err := cm.mutex.Unlock(ctx)
// Unlock local lock even if the underlying implementation fails during unlock. The assumption
// is that underlying implementation should be able to handle a subsequent Lock call and provide
// mutual exclusion.
<-cm.timeoutLock
return err
}
// NewMutexMap allocates a new MutexMap. The passed db will be used for all database operations. options can be
// used to customize behaviour.
func NewMutexMap(ctx context.Context, db *sql.DB, options ...MutexMapOption) (*MutexMap, error) {
mmo := &mutexMapOptions{
maxLocalWaiters: -1,
}
for _, o := range options {
o(mmo)
}
if mmo.maxLocalWaiters < -1 {
mmo.maxLocalWaiters = -1
}
if mmo.allocator == nil {
mmo.allocator = dbMutexAllocator
}
mo := initMutexOptions(mmo.mutexOptions...)
drv := mo.driver
var err error
if drv == nil && (!mmo.delayResolveDriver || !mmo.delayCreateMissingTable) {
drv, err = driver.ResolveDriver(ctx, db)
if err != nil {
return nil, err
}
// Add the revolved driver to the list of mutex creation options so that we don't have to
// resolve it on creation of every Mutex.
mmo.mutexOptions = append(mmo.mutexOptions, WithDriver(drv))
}
if mo.createMissingTable && !mmo.delayCreateMissingTable {
err = createMutexTableIfNotExists(ctx, db, drv, mo.tableName)
if err != nil {
return nil, err
}
// Configure Mutex to not create table since we just created it.
mmo.mutexOptions = append(mmo.mutexOptions, WithCreateMissingTable(false))
}
mmo.mutexOptions = append(mmo.mutexOptions, WithDelayAddMutexRow(true))
return &MutexMap{
options: mmo,
db: db,
countedMutexes: make(map[string]*countedMutex),
}, nil
}
// Lock locks named Mutex. If not already available for the given name, an underlying
// Mutex will be allocated and kept for later use. In order to lock with a timeout pass ctx that
// has a deadline. The returned context can be used to detect if the lock is lost.
func (mm *MutexMap) Lock(ctx context.Context, name string) (context.Context, error) {
cm, err := mm.acquireReference(name, true)
if err != nil {
return nil, err
}
lockExpirationCtx, err2 := cm.lock(
ctx,
func() (mutexOperations, error) {
return mm.options.allocator(ctx, mm.db, append(mm.options.mutexOptions, WithMutexName(name))...)
},
mm.options.maxLocalWaiters,
name,
)
if err2 != nil {
// remove lock if it's not used any longer because we had an error
mm.releaseReference(name)
}
return lockExpirationCtx, err2
}
// Unlock unlocks the named Mutex. Once no more references (including waiting lockers) are held for
// the given name, the underlying Mutex is removed from an internal map. So, it is likely that
// new Mutex objects are frequently allocated and released.
func (mm *MutexMap) Unlock(ctx context.Context, name string) error {
cm, err := mm.acquireReference(name, false)
if err != nil {
return err
}
// release reference we just acquired
defer mm.releaseReference(name)
// Release reference held from original locking since we are now unlocked. Release reference
// even if there is a failure because we assume that the underlying implementation can
// provide mutual exclusion even after an unlock failure.
defer mm.releaseReference(name)
return cm.unlock(ctx)
}
// acquireReference locks the map and obtains a reference to the countedMutex.
func (mm *MutexMap) acquireReference(name string, locking bool) (*countedMutex, error) {
mm.lock.Lock()
defer mm.lock.Unlock()
cm, exists := mm.countedMutexes[name]
if !exists {
if !locking {
return nil, dbmerr.NewNotLockedError("n/a", name)
}
cm = &countedMutex{
timeoutLock: make(chan struct{}, 1),
}
mm.countedMutexes[name] = cm
}
cm.references++
return cm, nil
}
// releaseReference locks the map, acquires the named countedMutex and removes it from the map
// if the reference count drops to zero.
func (mm *MutexMap) releaseReference(name string) {
mm.lock.Lock()
cm, exists := mm.countedMutexes[name]
if !exists {
return
}
cm.references--
if cm.references == 0 {
delete(mm.countedMutexes, name)
}
mm.lock.Unlock()
}
// helper for testing
func (mm *MutexMap) len() int {
mm.lock.Lock()
defer mm.lock.Unlock()
return len(mm.countedMutexes)
}