Skip to content

Commit 549d87c

Browse files
committed
fix: wrong ownership of backup partitions
1 parent 7497af0 commit 549d87c

28 files changed

+147
-143
lines changed

internal/cluster/balancer/balancer.go

Lines changed: 62 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -63,41 +63,39 @@ func (b *Balancer) isAlive() bool {
6363
return true
6464
}
6565

66-
func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owner discovery.Member) {
66+
func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners... discovery.Member) bool {
67+
var clean = true
6768
part.Map().Range(func(name, tmp interface{}) bool {
68-
if !b.isAlive() {
69-
// Break the loop
70-
return false
71-
}
7269
u := tmp.(partitions.Fragment)
7370

74-
b.log.V(2).Printf("[INFO] Moving %s: %s (kind: %s) on PartID: %d to %s", u.Name(), name, part.Kind(), part.Id(), owner)
75-
err := u.Move(part.Id(), part.Kind(), name.(string), owner)
76-
if err != nil {
77-
b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v", u.Name(), name, part.Id(), owner, err)
78-
}
79-
if err == nil {
80-
// Delete the moved storage unit instance. GC will free the allocated memory.
81-
part.Map().Delete(name)
71+
for _, owner := range owners {
72+
b.log.V(2).Printf("[INFO] Moving %s: %s (kind: %s) on PartID: %d to %s",
73+
u.Name(), name, part.Kind(), part.Id(), owner)
74+
75+
err := u.Move(part.Id(), part.Kind(), name.(string), owner)
76+
77+
if err != nil {
78+
b.log.V(2).Printf("[ERROR] Failed to move %s: %s on PartID: %d to %s: %v",
79+
u.Name(), name, part.Id(), owner, err)
80+
clean = false
81+
}
8282
}
83+
8384
// if this returns true, the iteration continues
84-
return sign == b.rt.Signature()
85+
return !b.breakLoop(sign)
8586
})
87+
88+
return clean
8689
}
8790

8891
func (b *Balancer) primaryCopies() {
8992
sign := b.rt.Signature()
9093
for partID := uint64(0); partID < b.config.PartitionCount; partID++ {
91-
if !b.isAlive() {
92-
break
93-
}
94-
if sign != b.rt.Signature() {
95-
// Routing table is updated. Just quit. Another balancer goroutine
96-
// will work on the new table immediately.
94+
if b.breakLoop(sign) {
9795
break
9896
}
9997

100-
part := b.primary.PartitionById(partID)
98+
part := b.primary.PartitionByID(partID)
10199
if part.Length() == 0 {
102100
// Empty partition. Skip it.
103101
continue
@@ -112,75 +110,75 @@ func (b *Balancer) primaryCopies() {
112110
// Already belongs to me.
113111
continue
114112
}
113+
115114
// This is a previous owner. Move the keys.
116-
b.scanPartition(sign, part, owner)
115+
if b.scanPartition(sign, part, owner) {
116+
part.Map().Range(func(name, tmp interface{}) bool {
117+
// Delete the moved storage unit instance. GC will free the allocated memory.
118+
part.Map().Delete(name)
119+
return true
120+
})
121+
}
117122
}
118123
}
119124

125+
func (b *Balancer) breakLoop(sign uint64) bool {
126+
if !b.isAlive() {
127+
return true
128+
}
129+
130+
if sign != b.rt.Signature() {
131+
// Routing table is updated. Just quit. Another balancer goroutine
132+
// will work on the new table immediately.
133+
return true
134+
}
135+
136+
return false
137+
}
138+
120139
func (b *Balancer) backupCopies() {
121140
sign := b.rt.Signature()
141+
LOOP:
122142
for partID := uint64(0); partID < b.config.PartitionCount; partID++ {
123-
if !b.isAlive() {
143+
if b.breakLoop(sign) {
124144
break
125145
}
126146

127-
if sign != b.rt.Signature() {
128-
// Routing table is updated. Just quit. Another balancer goroutine
129-
// will work on the new table immediately.
130-
break
131-
}
132-
133-
part := b.backup.PartitionById(partID)
134-
if part.Length() == 0 {
135-
// Empty partition. Skip it.
147+
part := b.backup.PartitionByID(partID)
148+
if part.Length() == 0 || part.OwnerCount() == 0{
136149
continue
137150
}
138151

139-
if part.OwnerCount() == 0 {
140-
// This partition doesn't have any backup owner
141-
continue
142-
}
152+
var (
153+
counter = 1
154+
currentOwners []discovery.Member
155+
)
143156

144157
owners := part.Owners()
145-
if len(owners) == b.config.ReplicaCount-1 {
146-
// Everything is ok
147-
continue
148-
}
158+
for i := len(owners) - 1; i >= 0; i-- {
159+
if counter >= b.config.ReplicaCount-1 {
160+
break
161+
}
149162

150-
var ownerIDs []uint64
151-
offset := len(owners) - 1 - (b.config.ReplicaCount - 1)
152-
if offset <= 0 {
153-
offset = -1
154-
}
155-
for i := len(owners) - 1; i > offset; i-- {
163+
counter++
156164
owner := owners[i]
157165
// Here we don't use CompareById function because the routing table
158166
// is an eventually consistent data structure and a node can try to
159167
// move data to previous instance(the same name but a different birthdate)
160168
// of itself. So just check the name.
161169
if b.rt.This().CompareByName(owner) {
162170
// Already belongs to me.
163-
continue
171+
continue LOOP
164172
}
165-
ownerIDs = append(ownerIDs, owner.ID)
173+
currentOwners = append(currentOwners, owner)
166174
}
167175

168-
for _, ownerID := range ownerIDs {
169-
if !b.isAlive() {
170-
break
171-
}
172-
if sign != b.rt.Signature() {
173-
// Routing table is updated. Just quit. Another balancer goroutine
174-
// will work on the new table immediately.
175-
break
176-
}
177-
178-
owner, err := b.rt.Discovery().FindMemberByID(ownerID)
179-
if err != nil {
180-
b.log.V(2).Printf("[ERROR] Failed to get host by ownerId: %d: %v", ownerID, err)
181-
continue
182-
}
183-
b.scanPartition(sign, part, owner)
176+
if b.scanPartition(sign, part, currentOwners...) {
177+
part.Map().Range(func(name, tmp interface{}) bool {
178+
// Delete the moved storage unit instance. GC will free the allocated memory.
179+
part.Map().Delete(name)
180+
return true
181+
})
184182
}
185183
}
186184
}

internal/cluster/balancer/balancer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func insertRandomData(e *environment.Environment, kind partitions.Kind) int {
138138
c := e.Get("config").(*config.Config)
139139
part := e.Get(strings.ToLower(kind.String())).(*partitions.Partitions)
140140
for partID := uint64(0); partID < c.PartitionCount; partID++ {
141-
part := part.PartitionById(partID)
141+
part := part.PartitionByID(partID)
142142
s := mockfragment.New()
143143
s.Fill()
144144
part.Map().Store("test-data", s)
@@ -152,7 +152,7 @@ func checkKeyCountAfterBalance(e *environment.Environment, kind partitions.Kind,
152152
part := e.Get(strings.ToLower(kind.String())).(*partitions.Partitions)
153153
var afterBalance int
154154
for partID := uint64(0); partID < c.PartitionCount; partID++ {
155-
part := part.PartitionById(partID)
155+
part := part.PartitionByID(partID)
156156
afterBalance += part.Length()
157157
}
158158
if afterBalance == total {
@@ -166,8 +166,8 @@ func checkBackupOwnership(e *environment.Environment) error {
166166
primary := e.Get(strings.ToLower(partitions.PRIMARY.String())).(*partitions.Partitions)
167167
backup := e.Get(strings.ToLower(partitions.BACKUP.String())).(*partitions.Partitions)
168168
for partID := uint64(0); partID < c.PartitionCount; partID++ {
169-
primaryOwner := primary.PartitionById(partID).Owner()
170-
part := backup.PartitionById(partID)
169+
primaryOwner := primary.PartitionByID(partID).Owner()
170+
part := backup.PartitionByID(partID)
171171
for _, owner := range part.Owners() {
172172
if primaryOwner.CompareByID(owner) {
173173
return fmt.Errorf("%s is the primary and backup owner of partID: %d at the same time", primaryOwner, partID)

internal/cluster/partitions/partitions.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ import (
2323
type Kind int
2424

2525
func (k Kind) String() string {
26-
if k == PRIMARY {
26+
switch {
27+
case k == PRIMARY:
2728
return "Primary"
28-
} else if k == BACKUP {
29+
case k == BACKUP:
2930
return "Backup"
30-
} else {
31+
default:
3132
return "Unknown"
3233
}
3334
}
@@ -59,19 +60,19 @@ func New(count uint64, kind Kind) *Partitions {
5960
return ps
6061
}
6162

62-
// PartitionById returns the partition for the given HKey
63-
func (ps *Partitions) PartitionById(partID uint64) *Partition {
63+
// PartitionByID returns the partition for the given HKey
64+
func (ps *Partitions) PartitionByID(partID uint64) *Partition {
6465
return ps.m[partID]
6566
}
6667

67-
// PartitionIdByHKey returns partition ID for a given HKey.
68-
func (ps *Partitions) PartitionIdByHKey(hkey uint64) uint64 {
68+
// PartitionIDByHKey returns partition ID for a given HKey.
69+
func (ps *Partitions) PartitionIDByHKey(hkey uint64) uint64 {
6970
return hkey % ps.count
7071
}
7172

7273
// PartitionByHKey returns the partition for the given HKey
7374
func (ps *Partitions) PartitionByHKey(hkey uint64) *Partition {
74-
partID := ps.PartitionIdByHKey(hkey)
75+
partID := ps.PartitionIDByHKey(hkey)
7576
return ps.m[partID]
7677
}
7778

@@ -81,8 +82,8 @@ func (ps *Partitions) PartitionOwnersByHKey(hkey uint64) []discovery.Member {
8182
return part.owners.Load().([]discovery.Member)
8283
}
8384

84-
// PartitionOwnersByHKey loads the partition owners list for a given hkey.
85-
func (ps *Partitions) PartitionOwnersById(partID uint64) []discovery.Member {
86-
part := ps.PartitionById(partID)
85+
// PartitionOwnersByID loads the partition owners list for a given hkey.
86+
func (ps *Partitions) PartitionOwnersByID(partID uint64) []discovery.Member {
87+
part := ps.PartitionByID(partID)
8788
return part.owners.Load().([]discovery.Member)
8889
}

internal/cluster/partitions/partitions_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestPartitions(t *testing.T) {
2727

2828
t.Run("PartitionById", func(t *testing.T) {
2929
for partID := uint64(0); partID < partitionCount; partID++ {
30-
part := ps.PartitionById(partID)
30+
part := ps.PartitionByID(partID)
3131
if part.Id() != partID {
3232
t.Fatalf("Expected PartID: %d. Got: %d", partID, part.Id())
3333
}
@@ -39,7 +39,7 @@ func TestPartitions(t *testing.T) {
3939

4040
t.Run("PartitionIdByHKey", func(t *testing.T) {
4141
// 1 % 271 = 1
42-
partID := ps.PartitionIdByHKey(1)
42+
partID := ps.PartitionIDByHKey(1)
4343
if partID != 1 {
4444
t.Fatalf("Expected PartID: 1. Got: %d", partID)
4545
}
@@ -66,12 +66,12 @@ func TestPartitions(t *testing.T) {
6666
})
6767

6868
t.Run("PartitionOwnersById", func(t *testing.T) {
69-
part := ps.PartitionById(1)
69+
part := ps.PartitionByID(1)
7070
tmp := []discovery.Member{{
7171
Name: "test-member",
7272
}}
7373
part.SetOwners(tmp)
74-
owners := ps.PartitionOwnersById(1)
74+
owners := ps.PartitionOwnersByID(1)
7575
if !reflect.DeepEqual(owners, tmp) {
7676
t.Fatalf("Partition owners slice is different")
7777
}

internal/cluster/routingtable/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ loop:
9191
case <-ctx.Done():
9292
// context is done
9393
err := ctx.Err()
94-
if err == context.DeadlineExceeded {
94+
if errors.Is(err, context.DeadlineExceeded) {
9595
break loop
9696
}
97-
if err == context.Canceled {
97+
if errors.Is(err, context.Canceled) {
9898
return ErrServerGone
9999
}
100100
return err

internal/cluster/routingtable/distribute.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package routingtable
1616

1717
import (
18+
"errors"
19+
1820
"github.com/buraksezer/consistent"
1921
"github.com/buraksezer/olric/internal/discovery"
2022
"github.com/buraksezer/olric/internal/protocol"
@@ -23,7 +25,7 @@ import (
2325

2426
func (r *RoutingTable) distributePrimaryCopies(partID uint64) []discovery.Member {
2527
// First you need to create a copy of the owners list. Don't modify the current list.
26-
part := r.primary.PartitionById(partID)
28+
part := r.primary.PartitionByID(partID)
2729
owners := make([]discovery.Member, part.OwnerCount())
2830
copy(owners, part.Owners())
2931

@@ -41,7 +43,7 @@ func (r *RoutingTable) distributePrimaryCopies(partID uint64) []discovery.Member
4143
owner := owners[i]
4244
current, err := r.discovery.FindMemberByName(owner.Name)
4345
if err != nil {
44-
r.log.V(4).Printf("[ERROR] Failed to find %s in the cluster: %v", owner, err)
46+
r.log.V(6).Printf("[DEBUG] Failed to find %s in the cluster: %v", owner, err)
4547
owners = append(owners[:i], owners[i+1:]...)
4648
i--
4749
continue
@@ -69,7 +71,6 @@ func (r *RoutingTable) distributePrimaryCopies(partID uint64) []discovery.Member
6971
var count int32
7072
err = msgpack.Unmarshal(res.Value(), &count)
7173
if err != nil {
72-
//db.log.V(3).Printf("[ERROR] Failed to unmarshal key count while checking primary partition: %d: %v", partID, err)
7374
// This may be a temporary issue.
7475
// Pass it. If the node is gone, memberlist package will notify us.
7576
continue
@@ -96,7 +97,7 @@ func (r *RoutingTable) distributePrimaryCopies(partID uint64) []discovery.Member
9697
func (r *RoutingTable) getReplicaOwners(partID uint64) ([]consistent.Member, error) {
9798
for i := r.config.ReplicaCount; i > 0; i-- {
9899
newOwners, err := r.consistent.GetClosestNForPartition(int(partID), i)
99-
if err == consistent.ErrInsufficientMemberCount {
100+
if errors.Is(err, consistent.ErrInsufficientMemberCount) {
100101
continue
101102
}
102103
if err != nil {
@@ -109,7 +110,7 @@ func (r *RoutingTable) getReplicaOwners(partID uint64) ([]consistent.Member, err
109110
}
110111

111112
func (r *RoutingTable) distributeBackups(partID uint64) []discovery.Member {
112-
part := r.backup.PartitionById(partID)
113+
part := r.backup.PartitionByID(partID)
113114
owners := make([]discovery.Member, part.OwnerCount())
114115
copy(owners, part.Owners())
115116

@@ -136,7 +137,7 @@ func (r *RoutingTable) distributeBackups(partID uint64) []discovery.Member {
136137
backup := owners[i]
137138
cur, err := r.discovery.FindMemberByName(backup.Name)
138139
if err != nil {
139-
r.log.V(3).Printf("[ERROR] Failed to find %s in the cluster: %v", backup, err)
140+
r.log.V(6).Printf("[DEBUG] Failed to find %s in the cluster: %v", backup, err)
140141
// Delete it.
141142
owners = append(owners[:i], owners[i+1:]...)
142143
i--

internal/cluster/routingtable/distribute_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestRoutingTable_distributedBackups(t *testing.T) {
7070
rt2.UpdateEagerly()
7171

7272
for partID := uint64(0); partID < c3.PartitionCount; partID++ {
73-
part := rt3.backup.PartitionById(partID)
73+
part := rt3.backup.PartitionByID(partID)
7474
if part.OwnerCount() != 1 {
7575
t.Fatalf("Expected backup owners count: 1. Got: %d", part.OwnerCount())
7676
}

internal/cluster/routingtable/left_over_data.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ func (r *RoutingTable) processLeftOverDataReports(reports map[discovery.Member]*
4848
// data structures in this function is guarded by routingMtx
4949
for member, report := range reports {
5050
for _, partID := range report.Partitions {
51-
part := r.primary.PartitionById(partID)
51+
part := r.primary.PartitionByID(partID)
5252
ensureOwnership(member, partID, part)
5353
}
5454

5555
for _, partID := range report.Backups {
56-
part := r.backup.PartitionById(partID)
56+
part := r.backup.PartitionByID(partID)
5757
ensureOwnership(member, partID, part)
5858
}
5959
}

0 commit comments

Comments
 (0)