-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpubsub_test.go
71 lines (61 loc) · 1.61 KB
/
pubsub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package alephzero
import (
"fmt"
"sort"
"sync"
"testing"
)
func TestPubSub(t *testing.T) {
FileRemove("alephzero/foo.pubsub.a0")
topic := PubSubTopic{"foo", nil}
p, err := NewPublisher(topic)
check(t, err)
defer p.Close()
ss, err := NewSubscriberSync(topic, INIT_OLDEST, ITER_NEXT)
check(t, err)
defer ss.Close()
cnd := sync.NewCond(&sync.Mutex{})
allPayloads := [][]byte{}
s, err := NewSubscriber(topic, INIT_OLDEST, ITER_NEXT, func(pkt Packet) {
cnd.L.Lock()
allPayloads = append(allPayloads, pkt.Payload)
cnd.Signal()
cnd.L.Unlock()
})
check(t, err)
defer s.Close()
if hasNext, err := ss.HasNext(); err != nil || hasNext {
t.Error("HasNext() should be false")
}
p.Pub(NewPacket(nil, []byte("hello")))
if hasNext, err := ss.HasNext(); err != nil || !hasNext {
t.Error("HasNext() should be true")
}
pkt, err := ss.Next()
check(t, err)
if string(pkt.Payload) != "hello" {
t.Error("Payload() should be 'hello'")
}
hdrKeys := make([]string, 0, len(pkt.Headers))
for k := range pkt.Headers {
hdrKeys = append(hdrKeys, k)
}
sort.Strings(hdrKeys)
if fmt.Sprint(hdrKeys) != "[a0_time_mono a0_time_wall a0_transport_seq a0_writer_id a0_writer_seq]" {
t.Error("Headers() should be [a0_time_mono a0_time_wall a0_transport_seq a0_writer_id a0_writer_seq]")
}
if hasNext, err := ss.HasNext(); err != nil || hasNext {
t.Error("HasNext() should be false")
}
cnd.L.Lock()
for len(allPayloads) < 1 {
cnd.Wait()
}
cnd.L.Unlock()
if len(allPayloads) != 1 {
t.Error("should have received 1 packet")
}
if string(allPayloads[0]) != "hello" {
t.Error("payload 0 should be 'hello'")
}
}