Skip to content

Commit ad678ec

Browse files
committed
fix: malfunctioning read repair and balancer
1 parent 549d87c commit ad678ec

File tree

4 files changed

+19
-15
lines changed

4 files changed

+19
-15
lines changed

internal/cluster/balancer/balancer.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (b *Balancer) isAlive() bool {
6363
return true
6464
}
6565

66-
func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners... discovery.Member) bool {
66+
func (b *Balancer) scanPartition(sign uint64, part *partitions.Partition, owners ...discovery.Member) bool {
6767
var clean = true
6868
part.Map().Range(func(name, tmp interface{}) bool {
6969
u := tmp.(partitions.Fragment)
@@ -145,18 +145,18 @@ LOOP:
145145
}
146146

147147
part := b.backup.PartitionByID(partID)
148-
if part.Length() == 0 || part.OwnerCount() == 0{
148+
if part.Length() == 0 || part.OwnerCount() == 0 {
149149
continue
150150
}
151151

152152
var (
153-
counter = 1
153+
counter = 1
154154
currentOwners []discovery.Member
155155
)
156156

157157
owners := part.Owners()
158158
for i := len(owners) - 1; i >= 0; i-- {
159-
if counter >= b.config.ReplicaCount-1 {
159+
if counter > b.config.ReplicaCount-1 {
160160
break
161161
}
162162

@@ -173,6 +173,10 @@ LOOP:
173173
currentOwners = append(currentOwners, owner)
174174
}
175175

176+
if len(currentOwners) == 0 {
177+
continue LOOP
178+
}
179+
176180
if b.scanPartition(sign, part, currentOwners...) {
177181
part.Map().Range(func(name, tmp interface{}) bool {
178182
// Delete the moved storage unit instance. GC will free the allocated memory.

internal/cluster/routingtable/distribute.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package routingtable
1616

1717
import (
1818
"errors"
19-
2019
"github.com/buraksezer/consistent"
2120
"github.com/buraksezer/olric/internal/discovery"
2221
"github.com/buraksezer/olric/internal/protocol"

internal/dmap/get.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,15 @@ func (dm *DMap) sanitizeAndSortVersions(versions []*version) []*version {
195195
}
196196

197197
func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
198-
var versions []*version
199198
// Check backup.
200199
backups := dm.s.backup.PartitionOwnersByHKey(hkey)
200+
versions := make([]*version, 0, len(backups))
201201
for _, replica := range backups {
202202
req := protocol.NewDMapMessage(protocol.OpGetBackup)
203203
req.SetDMap(dm.name)
204204
req.SetKey(key)
205-
ver := &version{host: &replica}
205+
host := replica
206+
v := &version{host: &host}
206207
resp, err := dm.s.requestTo(replica.String(), req)
207208
if err != nil {
208209
if dm.s.log.V(3).Ok() {
@@ -212,21 +213,21 @@ func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
212213
} else {
213214
data := dm.engine.NewEntry()
214215
data.Decode(resp.Value())
215-
ver.entry = data
216+
v.entry = data
216217
}
217-
versions = append(versions, ver)
218+
versions = append(versions, v)
218219
}
219220
return versions
220221
}
221222

222223
func (dm *DMap) readRepair(winner *version, versions []*version) {
223-
for _, ver := range versions {
224-
if ver.entry != nil && winner.entry.Timestamp() == ver.entry.Timestamp() {
224+
for _, version := range versions {
225+
if version.entry != nil && winner.entry.Timestamp() == version.entry.Timestamp() {
225226
continue
226227
}
227228

228229
// Sync
229-
tmp := *ver.host
230+
tmp := *version.host
230231
if tmp.CompareByID(dm.s.rt.This()) {
231232
hkey := partitions.HKey(dm.name, winner.entry.Key())
232233
f, err := dm.getOrCreateFragment(hkey, partitions.PRIMARY)
@@ -270,9 +271,9 @@ func (dm *DMap) readRepair(winner *version, versions []*version) {
270271
TTL: winner.entry.TTL(),
271272
})
272273
}
273-
_, err := dm.s.requestTo(ver.host.String(), req)
274+
_, err := dm.s.requestTo(version.host.String(), req)
274275
if err != nil {
275-
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", ver.host, err)
276+
dm.s.log.V(3).Printf("[ERROR] Failed to synchronize replica %s: %v", version.host, err)
276277
}
277278
}
278279
}

internal/dmap/get_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func TestDMap_Get_ReadRepair(t *testing.T) {
201201
c2.ReplicaCount = 2
202202
e2 := testcluster.NewEnvironment(c2)
203203
s2 := cluster.AddMember(e2).(*Service)
204+
204205
defer cluster.Shutdown()
205206

206207
// Call DMap.Put on S1
@@ -230,7 +231,6 @@ func TestDMap_Get_ReadRepair(t *testing.T) {
230231
c3.ReplicaCount = 2
231232
e3 := testcluster.NewEnvironment(c3)
232233
s3 := cluster.AddMember(e3).(*Service)
233-
defer cluster.Shutdown()
234234

235235
// Call DMap.Get on S2
236236
dm2, err := s3.NewDMap("mymap")

0 commit comments

Comments
 (0)