Skip to content

Commit d4396fb

Browse files
committed
refactor: eliminate ootential race condition in fragment creation #99
1 parent ad678ec commit d4396fb

File tree

18 files changed

+173
-97
lines changed

18 files changed

+173
-97
lines changed

internal/cluster/balancer/balancer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners
7070

7171
for _, owner := range owners {
7272
b.log.V(2).Printf("[INFO] Moving %s: %s (kind: %s) on PartID: %d to %s",
73-
u.Name(), name, part.Kind(), part.Id(), owner)
73+
u.Name(), name, part.Kind(), part.ID(), owner)
7474

75-
err := u.Move(part.Id(), part.Kind(), name.(string), owner)
75+
err := u.Move(part.ID(), part.Kind(), name.(string), owner)
7676

7777
if err != nil {
7878
b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v",
79-
u.Name(), name, part.Id(), owner, err)
79+
u.Name(), name, part.ID(), owner, err)
8080
clean = false
8181
}
8282
}

internal/cluster/partitions/partition.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ type Partition struct {
2727

2828
id uint64
2929
kind Kind
30-
smap *sync.Map
30+
m *sync.Map
3131
owners atomic.Value
3232
}
3333

3434
func (p *Partition) Kind() Kind {
3535
return p.kind
3636
}
3737

38-
func (p *Partition) Id() uint64 {
38+
func (p *Partition) ID() uint64 {
3939
return p.id
4040
}
4141

4242
func (p *Partition) Map() *sync.Map {
43-
return p.smap
43+
return p.m
4444
}
4545

4646
// Owner returns partition Owner. It's not thread-safe.

internal/cluster/partitions/partition_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestPartition(t *testing.T) {
5959
p := Partition{
6060
id: 1,
6161
kind: PRIMARY,
62-
smap: &sync.Map{},
62+
m: &sync.Map{},
6363
}
6464

6565
tmp := []discovery.Member{{

internal/cluster/partitions/partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func New(count uint64, kind Kind) *Partitions {
5454
ps.m[i] = &Partition{
5555
id: i,
5656
kind: kind,
57-
smap: &sync.Map{},
57+
m: &sync.Map{},
5858
}
5959
}
6060
return ps

internal/cluster/partitions/partitions_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func TestPartitions(t *testing.T) {
2828
t.Run("PartitionById", func(t *testing.T) {
2929
for partID := uint64(0); partID < partitionCount; partID++ {
3030
part := ps.PartitionByID(partID)
31-
if part.Id() != partID {
32-
t.Fatalf("Expected PartID: %d. Got: %d", partID, part.Id())
31+
if part.ID() != partID {
32+
t.Fatalf("Expected PartID: %d. Got: %d", partID, part.ID())
3333
}
3434
if part.Kind() != PRIMARY {
3535
t.Fatalf("Expected Kind: %s. Got: %s", PRIMARY, part.Kind())
@@ -48,8 +48,8 @@ func TestPartitions(t *testing.T) {
4848
t.Run("PartitionByHKey", func(t *testing.T) {
4949
// 1 % 271 = 1
5050
part := ps.PartitionByHKey(1)
51-
if part.Id() != 1 {
52-
t.Fatalf("Expected PartID: 1. Got: %d", part.Id())
51+
if part.ID() != 1 {
52+
t.Fatalf("Expected PartID: 1. Got: %d", part.ID())
5353
}
5454
})
5555

internal/dmap/balance.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@ func (dm *DMap) selectVersionForMerge(f *fragment, hkey uint64, entry storage.En
4747
}
4848

4949
func (dm *DMap) mergeFragments(part *partitions.Partition, fp *fragmentPack) error {
50-
f, err := dm.loadFragmentFromPartition(part)
51-
if errors.Is(err, errFragmentNotFound) {
52-
f, err = dm.createFragmentOnPartition(part)
53-
}
50+
f, err := dm.loadOrCreateFragment(part)
5451
if err != nil {
5552
return err
5653
}

internal/dmap/balance_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func TestDMap_Merge_Fragments(t *testing.T) {
4444
}
4545
}
4646
hkey := partitions.HKey("mymap", key)
47-
f, err := dm.getFragment(hkey, partitions.PRIMARY)
47+
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
48+
f, err := dm.loadFragment(part)
4849
if err != nil {
4950
t.Fatalf("Expected nil. Got: %v", err)
5051
}

internal/dmap/delete.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ var (
3535

3636
func (dm *DMap) deleteBackupFromFragment(key string, kind partitions.Kind) error {
3737
hkey := partitions.HKey(dm.name, key)
38-
f, err := dm.getFragment(hkey, kind)
38+
part := dm.getPartitionByHKey(hkey, kind)
39+
f, err := dm.loadFragment(part)
3940
if errors.Is(err, errFragmentNotFound) {
4041
// key doesn't exist
4142
return nil
4243
}
4344
if err != nil {
4445
return err
4546
}
47+
4648
f.Lock()
4749
defer f.Unlock()
4850

@@ -140,8 +142,12 @@ func (dm *DMap) deleteKey(key string) error {
140142
return err
141143
}
142144

143-
// notice that "delete" operation is run on the cluster.
144-
f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY)
145+
part := dm.getPartitionByHKey(hkey, partitions.PRIMARY)
146+
f, err := dm.loadFragment(part)
147+
if errors.Is(err, errFragmentNotFound) {
148+
// notice that "delete" operation is run on the cluster.
149+
err = nil
150+
}
145151
if err != nil {
146152
return err
147153
}

internal/dmap/destroy_operations.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package dmap
1616

1717
import (
18+
"errors"
19+
1820
"github.com/buraksezer/olric/config"
1921
"github.com/buraksezer/olric/internal/cluster/partitions"
2022
"github.com/buraksezer/olric/internal/protocol"
@@ -37,8 +39,8 @@ func (s *Service) destroyOperation(w, r protocol.EncodeDecoder) {
3739
}
3840

3941
func (dm *DMap) destroyFragmentOnPartition(part *partitions.Partition) error {
40-
f, err := dm.loadFragmentFromPartition(part)
41-
if err == errFragmentNotFound {
42+
f, err := dm.loadFragment(part)
43+
if errors.Is(err, errFragmentNotFound) {
4244
// not exists
4345
return nil
4446
}

internal/dmap/dmap.go

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package dmap
1616

1717
import (
18-
"context"
1918
"errors"
2019
"fmt"
2120
"time"
@@ -38,7 +37,7 @@ var (
3837
ErrDMapNotFound = errors.New("dmap not found")
3938
)
4039

41-
// DMap implements a single hop distributed hash table.
40+
// DMap implements a single-hop distributed hash table.
4241
type DMap struct {
4342
name string
4443
s *Service
@@ -110,31 +109,6 @@ func (s *Service) getOrCreateDMap(name string) (*DMap, error) {
110109
return dm, err
111110
}
112111

113-
func (dm *DMap) loadFragmentFromPartition(part *partitions.Partition) (*fragment, error) {
114-
f, ok := part.Map().Load(dm.name)
115-
if !ok {
116-
return nil, errFragmentNotFound
117-
}
118-
return f.(*fragment), nil
119-
}
120-
121-
func (dm *DMap) createFragmentOnPartition(part *partitions.Partition) (*fragment, error) {
122-
ctx, cancel := context.WithCancel(context.Background())
123-
f := &fragment{
124-
service: dm.s,
125-
accessLog: newAccessLog(),
126-
ctx: ctx,
127-
cancel: cancel,
128-
}
129-
var err error
130-
f.storage, err = dm.engine.Fork(nil)
131-
if err != nil {
132-
return nil, err
133-
}
134-
part.Map().Store(dm.name, f)
135-
return f, nil
136-
}
137-
138112
func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partitions.Partition {
139113
var part *partitions.Partition
140114
switch {
@@ -148,27 +122,6 @@ func (dm *DMap) getPartitionByHKey(hkey uint64, kind partitions.Kind) *partition
148122
return part
149123
}
150124

151-
func (dm *DMap) getFragment(hkey uint64, kind partitions.Kind) (*fragment, error) {
152-
part := dm.getPartitionByHKey(hkey, kind)
153-
part.Lock()
154-
defer part.Unlock()
155-
return dm.loadFragmentFromPartition(part)
156-
}
157-
158-
func (dm *DMap) getOrCreateFragment(hkey uint64, kind partitions.Kind) (*fragment, error) {
159-
part := dm.getPartitionByHKey(hkey, kind)
160-
part.Lock()
161-
defer part.Unlock()
162-
163-
// try to get
164-
f, err := dm.loadFragmentFromPartition(part)
165-
if errors.Is(err, errFragmentNotFound) {
166-
// create the fragment and return
167-
return dm.createFragmentOnPartition(part)
168-
}
169-
return f, err
170-
}
171-
172125
func timeoutToTTL(timeout time.Duration) int64 {
173126
if timeout.Seconds() == 0 {
174127
return 0

0 commit comments

Comments
 (0)