Skip to content

Commit d6af61f

Browse files
committed
fix: Race condition in DMap creation
refactor: More consistent logging
1 parent 003e447 commit d6af61f

File tree

10 files changed

+38
-54
lines changed

10 files changed

+38
-54
lines changed

dmap.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,42 +51,31 @@ func (db *Olric) NewDMap(name string) (*DMap, error) {
5151
}, nil
5252
}
5353

54-
// createDMap creates and returns a new dmap, internal representation of a dmap.
54+
// createDMap creates and returns a new dmap, internal representation of a dmap. This function is not thread-safe.
5555
func (db *Olric) createDMap(part *partition, name string, str *storage.Storage) (*dmap, error) {
56-
// We need to protect storage.New
57-
part.Lock()
58-
defer part.Unlock()
59-
60-
// Try to load one more time. Another goroutine may have created the dmap.
61-
dm, ok := part.m.Load(name)
62-
if ok {
63-
return dm.(*dmap), nil
64-
}
65-
6656
// create a new map here.
6757
nm := &dmap{
6858
storage: str,
6959
}
70-
7160
if db.config.Cache != nil {
7261
err := db.setCacheConfiguration(nm, name)
7362
if err != nil {
7463
return nil, err
7564
}
7665
}
77-
7866
// rebalancer code may send a storage instance for the new dmap. Just use it.
7967
if nm.storage != nil {
8068
nm.storage = str
8169
} else {
8270
nm.storage = storage.New(db.config.TableSize)
8371
}
84-
8572
part.m.Store(name, nm)
8673
return nm, nil
8774
}
8875

8976
func (db *Olric) getOrCreateDMap(part *partition, name string) (*dmap, error) {
77+
part.Lock()
78+
defer part.Unlock()
9079
dm, ok := part.m.Load(name)
9180
if ok {
9281
return dm.(*dmap), nil
@@ -102,9 +91,5 @@ func (db *Olric) getDMap(name string, hkey uint64) (*dmap, error) {
10291

10392
func (db *Olric) getBackupDMap(name string, hkey uint64) (*dmap, error) {
10493
part := db.getBackupPartition(hkey)
105-
dm, ok := part.m.Load(name)
106-
if ok {
107-
return dm.(*dmap), nil
108-
}
109-
return db.createDMap(part, name, nil)
94+
return db.getOrCreateDMap(part, name)
11095
}

dmap_atomic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error)
2828
defer func() {
2929
err := db.locker.Unlock(atomicKey)
3030
if err != nil {
31-
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", w.key, w.dmap, err)
31+
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
3232
}
3333
}()
3434

@@ -105,7 +105,7 @@ func (db *Olric) getPut(w *writeop) ([]byte, error) {
105105
defer func() {
106106
err := db.locker.Unlock(atomicKey)
107107
if err != nil {
108-
db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on dmap: %s: %v", w.key, w.dmap, err)
108+
db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
109109
}
110110
}()
111111

dmap_destroy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (db *Olric) destroyDMap(name string) error {
4343
db.log.V(6).Printf("[DEBUG] Calling Destroy command on %s for %s", addr, name)
4444
_, err := db.requestTo(addr, req)
4545
if err != nil {
46-
db.log.V(3).Printf("[ERROR] Failed to destroy dmap: %s on %s", name, addr)
46+
db.log.V(3).Printf("[ERROR] Failed to destroy DMap: %s on %s", name, addr)
4747
}
4848
return err
4949
})

dmap_eviction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (db *Olric) scanDMapForEviction(partID uint64, name string, dm *dmap) {
103103
err := db.delKeyVal(dm, hkey, name, vdata.Key)
104104
if err != nil {
105105
// It will be tried again.
106-
db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on dmap: %s: %v",
106+
db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on DMap: %s: %v",
107107
hkey, name, err)
108108
return true // this means 'continue'
109109
}
@@ -212,7 +212,7 @@ func (db *Olric) evictKeyWithLRU(dm *dmap, name string) error {
212212
return err
213213
}
214214
if db.log.V(6).Ok() {
215-
db.log.V(6).Printf("[DEBUG] Evicted item on dmap: %s, key: %s with LRU", name, key)
215+
db.log.V(6).Printf("[DEBUG] Evicted item on DMap: %s, key: %s with LRU", name, key)
216216
}
217217
return db.delKeyVal(dm, item.HKey, name, key)
218218
}

dmap_expire.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error {
7070
_, err := db.requestTo(owner.String(), req)
7171
if err != nil {
7272
if db.log.V(3).Ok() {
73-
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v",
73+
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v",
7474
owner, w.dmap, err)
7575
}
7676
continue
@@ -80,7 +80,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error {
8080
err := db.localExpire(hkey, dm, w)
8181
if err != nil {
8282
if db.log.V(3).Ok() {
83-
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v",
83+
db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v",
8484
db.this, w.dmap, err)
8585
}
8686
} else {

dmap_get.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*vers
5858
// the requested key can be found on a replica or a previous partition owner.
5959
if db.log.V(5).Ok() {
6060
db.log.V(5).Printf(
61-
"[DEBUG] key: %s, HKey: %d on dmap: %s could not be found on the local storage: %v",
61+
"[DEBUG] key: %s, HKey: %d on DMap: %s could not be found on the local storage: %v",
6262
key, hkey, name, err)
6363
}
6464
} else {

dmap_lock.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (db *Olric) unlockKey(name, key string, token []byte) error {
5050
defer func() {
5151
err := db.locker.Unlock(lkey)
5252
if err != nil {
53-
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", key, name, err)
53+
db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", key, name, err)
5454
}
5555
}()
5656

dmap_put.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
179179
_, err := db.requestTo(owner.String(), req)
180180
if err != nil {
181181
if db.log.V(3).Ok() {
182-
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", owner, w.dmap, err)
182+
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", owner, w.dmap, err)
183183
}
184184
continue
185185
}
@@ -188,7 +188,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
188188
err := db.localPut(hkey, dm, w)
189189
if err != nil {
190190
if db.log.V(3).Ok() {
191-
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", db.this, w.dmap, err)
191+
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", db.this, w.dmap, err)
192192
}
193193
} else {
194194
successful++

rebalancer.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -88,24 +88,20 @@ func (db *Olric) selectVersionForMerge(dm *dmap, hkey uint64, vdata *storage.VDa
8888
}
8989

9090
func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error {
91-
str, err := storage.Import(data.Payload)
91+
dm, err := db.getOrCreateDMap(part, data.Name)
9292
if err != nil {
9393
return err
9494
}
9595

96-
tmp, exist := part.m.Load(data.Name)
97-
if !exist {
98-
// create a new dmap if it doesn't exist.
99-
tmp, err = db.createDMap(part, data.Name, str)
100-
if err != nil {
101-
return err
102-
}
103-
}
104-
10596
// Acquire dmap's lock. No one should work on it.
106-
dm := tmp.(*dmap)
10797
dm.Lock()
10898
defer dm.Unlock()
99+
defer part.m.Store(data.Name, dm)
100+
101+
str, err := storage.Import(data.Payload)
102+
if err != nil {
103+
return err
104+
}
109105

110106
// Merge accessLog.
111107
if dm.cache != nil && dm.cache.accessLog != nil {
@@ -118,11 +114,14 @@ func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error {
118114
dm.cache.Unlock()
119115
}
120116

121-
// We do not need the following loop if the dmap is created here.
122-
if !exist {
117+
if dm.storage.Len() == 0 {
118+
// DMap has no keys. Set the imported storage instance.
119+
// The old one will be garbage collected.
120+
dm.storage = str
123121
return nil
124122
}
125123

124+
// DMap has some keys. Merge with the new one.
126125
var mergeErr error
127126
str.Range(func(hkey uint64, vdata *storage.VData) bool {
128127
winner, err := db.selectVersionForMerge(dm, hkey, vdata)
@@ -171,11 +170,11 @@ func (db *Olric) rebalancePrimaryPartitions() {
171170
}
172171
// This is a previous owner. Move the keys.
173172
part.m.Range(func(name, dm interface{}) bool {
174-
db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s",
173+
db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s",
175174
name, part.backup, partID, owner)
176175
err := db.moveDMap(part, name.(string), dm.(*dmap), owner)
177176
if err != nil {
178-
db.log.V(2).Printf("[ERROR] Failed to move dmap: %s on PartID: %d to %s: %v",
177+
db.log.V(2).Printf("[ERROR] Failed to move DMap: %s on PartID: %d to %s: %v",
179178
name, partID, owner, err)
180179
}
181180
// if this returns true, the iteration continues
@@ -232,11 +231,11 @@ func (db *Olric) rebalanceBackupPartitions() {
232231
}
233232

234233
part.m.Range(func(name, dm interface{}) bool {
235-
db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s",
234+
db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s",
236235
name, part.backup, partID, owner)
237236
err := db.moveDMap(part, name.(string), dm.(*dmap), owner)
238237
if err != nil {
239-
db.log.V(2).Printf("[ERROR] Failed to move backup dmap: %s on PartID: %d to %s: %v",
238+
db.log.V(2).Printf("[ERROR] Failed to move backup DMap: %s on PartID: %d to %s: %v",
240239
name, partID, owner, err)
241240
}
242241
// if this returns true, the iteration continues
@@ -294,20 +293,19 @@ func (db *Olric) moveDMapOperation(w, r protocol.EncodeDecoder) {
294293
}
295294
// Check ownership before merging. This is useful to prevent data corruption in network partitioning case.
296295
if !db.checkOwnership(part) {
297-
db.log.V(2).Printf("[ERROR] Received dmap: %s on PartID: %d (backup: %v) doesn't belong to me",
296+
db.log.V(2).Printf("[ERROR] Received DMap: %s on PartID: %d (backup: %v) doesn't belong to me",
298297
box.Name, box.PartID, box.Backup)
299-
300298
err := fmt.Errorf("partID: %d (backup: %v) doesn't belong to %s: %w", box.PartID, box.Backup, db.this, ErrInvalidArgument)
301299
db.errorResponse(w, err)
302300
return
303301
}
304302

305-
db.log.V(2).Printf("[INFO] Received dmap (backup:%v): %s on PartID: %d",
303+
db.log.V(2).Printf("[INFO] Received DMap (backup:%v): %s on PartID: %d",
306304
box.Backup, box.Name, box.PartID)
307305

308306
err = db.mergeDMaps(part, box)
309307
if err != nil {
310-
db.log.V(2).Printf("[ERROR] Failed to merge dmap: %v", err)
308+
db.log.V(2).Printf("[ERROR] Failed to merge DMap: %v", err)
311309
db.errorResponse(w, err)
312310
return
313311
}

routing.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member {
125125
}
126126
if count == 0 {
127127
// Delete it.
128-
db.log.V(5).Printf("[DEBUG] Empty backup partition found. PartID: %d on %s", partID, backup)
129128
owners = append(owners[:i], owners[i+1:]...)
130129
i--
131130
}
@@ -206,7 +205,6 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member {
206205
continue
207206
}
208207
if count == 0 {
209-
db.log.V(6).Printf("[DEBUG] PartID: %d on %s is empty", partID, owner)
210208
// Empty partition. Delete it from ownership list.
211209
owners = append(owners[:i], owners[i+1:]...)
212210
i--
@@ -283,7 +281,10 @@ func (db *Olric) updateRoutingTableOnCluster(table routingTable) (map[discovery.
283281
return nil
284282
})
285283
}
286-
return ownershipReports, g.Wait()
284+
if err := g.Wait(); err != nil {
285+
return nil, err
286+
}
287+
return ownershipReports, nil
287288
}
288289

289290
func (db *Olric) updateRouting() {

0 commit comments

Comments
 (0)