-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathserver_test.go
94 lines (79 loc) · 2.13 KB
/
server_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package redplex
import (
"net"
"testing"
"time"
"github.com/garyburd/redigo/redis"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
const redisAddress = "127.0.0.1:6379"
type EndToEndServerSuite struct {
suite.Suite
server *Server
redplexConn redis.Conn
directConn redis.Conn
}
func TestEndToEndServerSuite(t *testing.T) {
suite.Run(t, new(EndToEndServerSuite))
}
func (e *EndToEndServerSuite) SetupSuite() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.Nil(e.T(), err)
e.server = NewServer(listener, NewPubsub(
NewDirectDialer("tcp", redisAddress, "", false, 0),
time.Second*5,
))
go e.server.Listen()
directConn, err := redis.Dial("tcp", redisAddress)
require.Nil(e.T(), err)
e.directConn = directConn
redplexConn, err := redis.Dial("tcp", listener.Addr().String())
require.Nil(e.T(), err)
e.redplexConn = redplexConn
}
func (e *EndToEndServerSuite) TearDownSuite() {
e.server.Close()
e.redplexConn.Close()
e.directConn.Close()
}
func (e *EndToEndServerSuite) TestSubscribesAndGetsMessages() {
psc := redis.PubSubConn{Conn: e.redplexConn}
require.Nil(e.T(), psc.Subscribe("foo"))
require.Equal(e.T(), redis.Subscription{Kind: "subscribe", Channel: "foo", Count: 1}, psc.Receive())
require.Nil(e.T(), psc.PSubscribe("ba*"))
require.Equal(e.T(), redis.Subscription{Kind: "psubscribe", Channel: "ba*", Count: 1}, psc.Receive())
e.retryUntilReturns(
func() {
_, err := e.directConn.Do("PUBLISH", "foo", "bar")
require.Nil(e.T(), err)
},
func() {
require.Equal(e.T(), redis.Message{Channel: "foo", Data: []byte("bar")}, psc.Receive())
},
)
e.retryUntilReturns(
func() {
_, err := e.directConn.Do("PUBLISH", "bar", "heyo!")
require.Nil(e.T(), err)
},
func() {
require.Equal(e.T(), redis.PMessage{Pattern: "ba*", Channel: "bar", Data: []byte("heyo!")}, psc.Receive())
},
)
}
func (e *EndToEndServerSuite) retryUntilReturns(retried func(), awaitedFn func()) {
ok := make(chan struct{})
go func() {
for {
retried()
select {
case <-time.After(time.Millisecond * 500):
case <-ok:
return
}
}
}()
awaitedFn()
ok <- struct{}{}
}