Skip to content

Commit 78dbc88

Browse files
committed
feat: Properly process memberlist.NodeUpdate
1 parent b7de198 commit 78dbc88

File tree

3 files changed

+44
-41
lines changed

3 files changed

+44
-41
lines changed

internal/discovery/discovery.go

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import (
3636

3737
const eventChanCapacity = 256
3838

39-
// ErrHostNotFound indicates that the requested host could not be found in the member list.
40-
var ErrHostNotFound = errors.New("host not found")
39+
// ErrMemberNotFound indicates that the requested member could not be found in the member list.
40+
var ErrMemberNotFound = errors.New("member not found")
4141

4242
// ClusterEvent is a single event related to node activity in the memberlist.
4343
// The Node member of this struct must not be directly modified.
@@ -58,7 +58,7 @@ func (c *ClusterEvent) MemberAddr() string {
5858
// provides useful functions to utilize it.
5959
type Discovery struct {
6060
log *flog.Logger
61-
host *Member
61+
member *Member
6262
memberlist *memberlist.Memberlist
6363
config *config.Config
6464

@@ -99,24 +99,22 @@ func (d *Discovery) DecodeNodeMeta(buf []byte) (Member, error) {
9999

100100
// New creates a new memberlist with a proper configuration and returns a new Discovery instance along with it.
101101
func New(log *flog.Logger, c *config.Config) (*Discovery, error) {
102-
// Calculate host's identity. It's useful to compare hosts.
102+
// Calculate member's identity. It's useful to compare hosts.
103103
birthdate := time.Now().UnixNano()
104104

105105
buf := make([]byte, 8+len(c.MemberlistConfig.Name))
106106
binary.BigEndian.PutUint64(buf, uint64(birthdate))
107107
buf = append(buf, []byte(c.MemberlistConfig.Name)...)
108-
109-
id := c.Hasher.Sum64(buf)
110108
nameHash := c.Hasher.Sum64([]byte(c.MemberlistConfig.Name))
111-
host := &Member{
109+
member := &Member{
112110
Name: c.MemberlistConfig.Name,
113111
NameHash: nameHash,
114-
ID: id,
112+
ID: c.Hasher.Sum64(buf),
115113
Birthdate: birthdate,
116114
}
117115
ctx, cancel := context.WithCancel(context.Background())
118116
d := &Discovery{
119-
host: host,
117+
member: member,
120118
config: c,
121119
log: log,
122120
deadMembers: make(map[string]int64),
@@ -201,7 +199,7 @@ func (d *Discovery) deadMemberTracker() {
201199
return
202200
case e := <-d.deadMemberEvents:
203201
member := e.MemberAddr()
204-
if e.Event == memberlist.NodeJoin {
202+
if e.Event == memberlist.NodeJoin || e.Event == memberlist.NodeUpdate {
205203
delete(d.deadMembers, member)
206204
} else if e.Event == memberlist.NodeLeave {
207205
d.deadMembers[member] = time.Now().UnixNano()
@@ -307,7 +305,7 @@ func (d *Discovery) FindMemberByName(name string) (Member, error) {
307305
return member, nil
308306
}
309307
}
310-
return Member{}, ErrHostNotFound
308+
return Member{}, ErrMemberNotFound
311309
}
312310

313311
// FindMemberByID finds and returns an alive member.
@@ -318,7 +316,7 @@ func (d *Discovery) FindMemberByID(id uint64) (Member, error) {
318316
return member, nil
319317
}
320318
}
321-
return Member{}, ErrHostNotFound
319+
return Member{}, ErrMemberNotFound
322320
}
323321

324322
// GetCoordinator returns the oldest node in the memberlist.
@@ -333,7 +331,7 @@ func (d *Discovery) GetCoordinator() Member {
333331

334332
// IsCoordinator returns true if the caller is the coordinator node.
335333
func (d *Discovery) IsCoordinator() bool {
336-
return d.GetCoordinator().ID == d.host.ID
334+
return d.GetCoordinator().ID == d.member.ID
337335
}
338336

339337
// LocalNode is used to return the local Node
@@ -376,7 +374,7 @@ func (d *Discovery) Shutdown() error {
376374
return d.memberlist.Shutdown()
377375
}
378376

379-
func convertToClusterEvent(e memberlist.NodeEvent) *ClusterEvent {
377+
func toClusterEvent(e memberlist.NodeEvent) *ClusterEvent {
380378
return &ClusterEvent{
381379
Event: e.Event,
382380
NodeName: e.Node.Name,
@@ -391,24 +389,10 @@ func (d *Discovery) handleEvent(event memberlist.NodeEvent) {
391389
defer d.clusterEventsMtx.RUnlock()
392390

393391
for _, ch := range d.eventSubscribers {
394-
if event.Node.Name == d.host.Name {
395-
continue
396-
}
397-
// NodeJoin or NodeLeave
398-
if event.Event != memberlist.NodeUpdate {
399-
ch <- convertToClusterEvent(event)
392+
if event.Node.Name == d.member.Name {
400393
continue
401394
}
402-
403-
// NodeUpdate: Olric is an in-memory k/v store. If the node metadata has been updated,
404-
// the node may be restarted or/and serves stale data.
405-
e := convertToClusterEvent(event)
406-
e.Event = memberlist.NodeLeave
407-
ch <- e
408-
// Create a Join event from copied event.
409-
e = convertToClusterEvent(event)
410-
e.Event = memberlist.NodeJoin
411-
ch <- e
395+
ch <- toClusterEvent(event)
412396
}
413397
}
414398

@@ -442,7 +426,7 @@ type delegate struct {
442426

443427
// newDelegate returns a new delegate instance.
444428
func (d *Discovery) newDelegate() (delegate, error) {
445-
data, err := msgpack.Marshal(d.host)
429+
data, err := msgpack.Marshal(d.member)
446430
if err != nil {
447431
return delegate{}, err
448432
}

rebalancer.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ func (db *Olric) rebalancePrimaryPartitions() {
164164
}
165165

166166
owner := part.owner()
167-
if cmpMembersByID(owner, db.this) {
167+
// Here we don't use cmpMembersById function because the routing table has an eventually consistent
168+
// data structure and a node can try to move data to previous instance(the same name but a different birthdate)
169+
// of itself. So just check the name.
170+
if cmpMembersByName(owner, db.this) {
168171
// Already belongs to me.
169172
continue
170173
}
@@ -206,7 +209,11 @@ func (db *Olric) rebalanceBackupPartitions() {
206209
offset := len(owners) - 1 - (db.config.ReplicaCount - 1)
207210
for i := len(owners) - 1; i > offset; i-- {
208211
owner := owners[i]
209-
if cmpMembersByID(db.this, owner) {
212+
// Here we don't use cmpMembersById function because the routing table has an eventually consistent
213+
// data structure and a node can try to move data to previous instance(the same name but a different birthdate)
214+
// of itself. So just check the name.
215+
if cmpMembersByName(db.this, owner) {
216+
// Already belongs to me.
210217
continue
211218
}
212219
ids = append(ids, owner.ID)
@@ -293,8 +300,8 @@ func (db *Olric) moveDMapOperation(w, r protocol.EncodeDecoder) {
293300
}
294301
// Check ownership before merging. This is useful to prevent data corruption in network partitioning case.
295302
if !db.checkOwnership(part) {
296-
db.log.V(2).Printf("[ERROR] Received DMap: %s on PartID: %d (backup: %v) doesn't belong to me",
297-
box.Name, box.PartID, box.Backup)
303+
db.log.V(2).Printf("[ERROR] Received DMap: %s on PartID: %d (backup: %v) doesn't belong to this node (%s)",
304+
box.Name, box.PartID, box.Backup, db.this)
298305
err := fmt.Errorf("partID: %d (backup: %v) doesn't belong to %s: %w", box.PartID, box.Backup, db.this, ErrInvalidArgument)
299306
db.errorResponse(w, err)
300307
return

routing.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (db *Olric) processOwnershipReports(reports map[discovery.Member]ownershipR
355355
}
356356
}
357357

358-
func (db *Olric) processNodeEvent(event *discovery.ClusterEvent) {
358+
func (db *Olric) processClusterEvent(event *discovery.ClusterEvent) {
359359
db.members.mtx.Lock()
360360
defer db.members.mtx.Unlock()
361361

@@ -365,17 +365,29 @@ func (db *Olric) processNodeEvent(event *discovery.ClusterEvent) {
365365
db.consistent.Add(member)
366366
db.log.V(2).Printf("[INFO] Node joined: %s", member)
367367
} else if event.Event == memberlist.NodeLeave {
368-
if _, ok := db.members.m[member.ID]; ok {
369-
delete(db.members.m, member.ID)
370-
} else {
371-
db.log.V(2).Printf("[ERROR] Unknown node left: %s", event.NodeName)
368+
if _, ok := db.members.m[member.ID]; !ok {
369+
db.log.V(2).Printf("[ERROR] Unknown node left: %s: %d", event.NodeName, member.ID)
372370
return
373371
}
374372

373+
delete(db.members.m, member.ID)
375374
db.consistent.Remove(event.NodeName)
376375
// Don't try to used closed sockets again.
377376
db.client.ClosePool(event.NodeName)
378377
db.log.V(2).Printf("[INFO] Node left: %s", event.NodeName)
378+
} else if event.Event == memberlist.NodeUpdate {
379+
// Node's birthdate may be changed. Close the pool and re-add to the hash ring.
380+
// This takes linear time, but member count should be too small for a decent computer!
381+
for id, item := range db.members.m {
382+
if cmpMembersByName(member, item) {
383+
delete(db.members.m, id)
384+
db.consistent.Remove(event.NodeName)
385+
db.client.ClosePool(event.NodeName)
386+
}
387+
}
388+
db.members.m[member.ID] = member
389+
db.consistent.Add(member)
390+
db.log.V(2).Printf("[INFO] Node updated: %s", member)
379391
} else {
380392
db.log.V(2).Printf("[ERROR] Unknown event received: %v", event)
381393
return
@@ -393,7 +405,7 @@ func (db *Olric) listenMemberlistEvents(eventCh chan *discovery.ClusterEvent) {
393405
case <-db.ctx.Done():
394406
return
395407
case e := <-eventCh:
396-
db.processNodeEvent(e)
408+
db.processClusterEvent(e)
397409
db.updateRouting()
398410
}
399411
}

0 commit comments

Comments
 (0)