Skip to content

Commit 7e61abe

Browse files
Add broadcast delegate tests for clustering
1 parent a8cfbf4 commit 7e61abe

1 file changed

Lines changed: 86 additions & 0 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package clustering
2+
3+
import (
4+
"testing"
5+
6+
"github.com/fabricekabongo/loggerhead/query"
7+
"github.com/fabricekabongo/loggerhead/world"
8+
"github.com/hashicorp/memberlist"
9+
)
10+
11+
func TestBroadcastDelegateNodeMeta(t *testing.T) {
12+
engine := query.NewWriteQueryEngine(world.NewWorld())
13+
delegate := newBroadcastDelegate(engine, &memberlist.TransmitLimitedQueue{})
14+
15+
if data := delegate.NodeMeta(0); len(data) != 0 {
16+
t.Fatalf("expected empty node meta, got %v", data)
17+
}
18+
}
19+
20+
func TestBroadcastDelegateNotifyMsgExecutesCommand(t *testing.T) {
21+
w := world.NewWorld()
22+
engine := query.NewWriteQueryEngine(w)
23+
delegate := newBroadcastDelegate(engine, &memberlist.TransmitLimitedQueue{})
24+
25+
delegate.NotifyMsg([]byte("SAVE ns loc 1 2"))
26+
27+
location, ok := w.GetLocation("ns", "loc")
28+
if !ok {
29+
t.Fatalf("expected location to be saved")
30+
}
31+
32+
if location.Lat() != 1 || location.Lon() != 2 {
33+
t.Fatalf("unexpected location coordinates: %v", location)
34+
}
35+
}
36+
37+
func TestBroadcastDelegateGetBroadcasts(t *testing.T) {
38+
broadcasts := &memberlist.TransmitLimitedQueue{NumNodes: func() int { return 1 }, RetransmitMult: 1}
39+
broadcasts.QueueBroadcast(NewLocationBroadcast("SAVE ns loc 1 1"))
40+
41+
delegate := newBroadcastDelegate(query.NewWriteQueryEngine(world.NewWorld()), broadcasts)
42+
43+
got := delegate.GetBroadcasts(0, 1024)
44+
if len(got) != 1 {
45+
t.Fatalf("expected 1 broadcast, got %d", len(got))
46+
}
47+
48+
if string(got[0]) != "SAVE ns loc 1 1" {
49+
t.Fatalf("unexpected broadcast payload %q", string(got[0]))
50+
}
51+
}
52+
53+
func TestBroadcastDelegateLocalState(t *testing.T) {
54+
w := world.NewWorld()
55+
_ = w.Save("ns", "loc", 10, 20)
56+
57+
delegate := newBroadcastDelegate(query.NewWriteQueryEngine(w), &memberlist.TransmitLimitedQueue{})
58+
59+
if data := delegate.LocalState(false); len(data) != 0 {
60+
t.Fatalf("expected empty state when join is false, got %v", data)
61+
}
62+
63+
data := delegate.LocalState(true)
64+
restored := world.NewWorldFromBytes(data)
65+
66+
loc, ok := restored.GetLocation("ns", "loc")
67+
if !ok || loc.Lat() != 10 || loc.Lon() != 20 {
68+
t.Fatalf("expected saved location after restoration, got %#v, present=%v", loc, ok)
69+
}
70+
}
71+
72+
func TestBroadcastDelegateMergeRemoteState(t *testing.T) {
73+
remote := world.NewWorld()
74+
_ = remote.Save("ns", "loc", 3, 4)
75+
buf := remote.ToBytes()
76+
77+
local := world.NewWorld()
78+
delegate := newBroadcastDelegate(query.NewWriteQueryEngine(local), &memberlist.TransmitLimitedQueue{})
79+
80+
delegate.MergeRemoteState(buf, true)
81+
82+
loc, ok := local.GetLocation("ns", "loc")
83+
if !ok || loc.Lat() != 3 || loc.Lon() != 4 {
84+
t.Fatalf("expected merged location, got %#v, present=%v", loc, ok)
85+
}
86+
}

0 commit comments

Comments
 (0)