@@ -3,7 +3,6 @@ package rabbitmq
3
3
import (
4
4
"context"
5
5
"fmt"
6
- "log"
7
6
"os"
8
7
"os/exec"
9
8
"strings"
@@ -14,7 +13,7 @@ import (
14
13
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
15
14
16
15
func prepareDockerTest (t * testing.T ) (connStr string ) {
17
- if v , ok := os .LookupEnv (enableDockerIntegrationTestsFlag ); ok && strings .ToUpper (v ) != "TRUE" {
16
+ if v , ok := os .LookupEnv (enableDockerIntegrationTestsFlag ); ! ok || strings .ToUpper (v ) != "TRUE" {
18
17
t .Skipf ("integration tests are only run if '%s' is TRUE" , enableDockerIntegrationTestsFlag )
19
18
return
20
19
}
@@ -27,7 +26,6 @@ func prepareDockerTest(t *testing.T) (connStr string) {
27
26
t .Fatalf ("error launching rabbitmq in docker: %v" , err )
28
27
}
29
28
t .Cleanup (func () {
30
- t .Log ("hi" )
31
29
containerId := strings .TrimSpace (string (out ))
32
30
t .Logf ("attempting to shutdown container '%s'" , containerId )
33
31
if err := exec .Command ("docker" , "rm" , "--force" , containerId ).Run (); err != nil {
@@ -37,7 +35,7 @@ func prepareDockerTest(t *testing.T) (connStr string) {
37
35
return "amqp://guest:guest@localhost:5672/"
38
36
}
39
37
40
- func waitForHealthyAmqp (t * testing.T , connStr string ) {
38
+ func waitForHealthyAmqp (t * testing.T , connStr string ) * Conn {
41
39
ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
42
40
defer cancel ()
43
41
tkr := time .NewTicker (time .Second )
@@ -46,84 +44,105 @@ func waitForHealthyAmqp(t *testing.T, connStr string) {
46
44
select {
47
45
case <- ctx .Done ():
48
46
t .Fatal ("timed out waiting for healthy amqp" , ctx .Err ())
47
+ return nil
49
48
case <- tkr .C :
50
- if err := func () error {
51
- t .Log ("attempting connection" )
52
- conn , err := NewConn (connStr )
53
- if err != nil {
54
- return fmt .Errorf ("failed to setup connection: %v" , err )
55
- }
56
- defer conn .Close ()
57
-
58
- pub , err := NewPublisher (conn )
59
- if err != nil {
60
- return fmt .Errorf ("failed to setup publisher: %v" , err )
61
- }
62
-
63
- t .Log ("attempting publish" )
64
- return pub .PublishWithContext (ctx , []byte {}, []string {"ping" }, WithPublishOptionsExchange ("" ))
65
- }(); err != nil {
66
- t .Log ("publish ping failed" , err .Error ())
49
+ conn , err := NewConn (connStr , WithConnectionOptionsLogger (simpleLogF (t .Logf )))
50
+ if err != nil {
51
+ t .Log ("failed to connect" , err .Error ())
67
52
} else {
68
- t .Log ("ping successful" )
69
- return
53
+ if err := func () error {
54
+ t .Log ("attempting connection" )
55
+
56
+ pub , err := NewPublisher (conn , WithPublisherOptionsLogger (simpleLogF (t .Logf )))
57
+ if err != nil {
58
+ return fmt .Errorf ("failed to setup publisher: %v" , err )
59
+ }
60
+
61
+ t .Log ("attempting publish" )
62
+ return pub .PublishWithContext (ctx , []byte {}, []string {"ping" }, WithPublishOptionsExchange ("" ))
63
+ }(); err != nil {
64
+ _ = conn .Close ()
65
+ t .Log ("publish ping failed" , err .Error ())
66
+ } else {
67
+ t .Log ("ping successful" )
68
+ return conn
69
+ }
70
70
}
71
71
}
72
72
}
73
73
}
74
74
75
+ // TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
76
+ // rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
77
+ // to ensure the result is as expected.
75
78
func TestSimplePubSub (t * testing.T ) {
76
79
connStr := prepareDockerTest (t )
77
- waitForHealthyAmqp (t , connStr )
78
-
79
- conn , err := NewConn (connStr )
80
- if err != nil {
81
- t .Fatal ("error creating connection" , err )
82
- }
80
+ conn := waitForHealthyAmqp (t , connStr )
83
81
defer conn .Close ()
84
82
85
83
t .Logf ("new consumer" )
86
- consumer , err := NewConsumer (conn , "my_queue" )
84
+ consumerQueue := "my_queue"
85
+ consumer , err := NewConsumer (conn , consumerQueue , WithConsumerOptionsLogger (simpleLogF (t .Logf )))
87
86
if err != nil {
88
87
t .Fatal ("error creating consumer" , err )
89
88
}
90
- defer consumer .Close ()
89
+ defer consumer .CloseWithContext (context .Background ())
90
+
91
+ // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
92
+ // it does not block.
93
+ consumed := make (chan Delivery )
94
+ defer close (consumed )
91
95
92
96
go func () {
93
97
err = consumer .Run (func (d Delivery ) Action {
94
- log .Printf ("consumed: %v" , string (d .Body ))
98
+ t .Log ("consumed" )
99
+ select {
100
+ case consumed <- d :
101
+ default :
102
+ }
95
103
return Ack
96
104
})
97
105
if err != nil {
98
106
t .Log ("consumer run failed" , err )
99
107
}
100
108
}()
101
109
110
+ // Setup a publisher with notifications enabled
102
111
t .Logf ("new publisher" )
103
- publisher , err := NewPublisher (conn )
112
+ publisher , err := NewPublisher (conn , WithPublisherOptionsLogger ( simpleLogF ( t . Logf )) )
104
113
if err != nil {
105
114
t .Fatal ("error creating publisher" , err )
106
115
}
107
116
publisher .NotifyPublish (func (p Confirmation ) {
108
117
return
109
118
})
119
+ defer publisher .Close ()
110
120
111
- ctx , cancel := context .WithCancel (context .Background ())
121
+ // For test stability we cannot rely on the fact that the consumer go routines are up and running before the
122
+ // publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
123
+ // pass after we see the first message come through.
124
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
112
125
defer cancel ()
113
-
114
- t .Logf ("new publish" )
115
- confirms , err := publisher .PublishWithDeferredConfirmWithContext (
116
- ctx , []byte ("example" ), []string {"my_queue" },
117
- WithPublishOptionsMandatory ,
118
- )
119
- if err != nil {
120
- t .Fatal ("failed to publish" , err )
121
- }
122
- for _ , confirm := range confirms {
123
- if _ , err := confirm .WaitContext (ctx ); err != nil {
124
- t .Fatal ("failed to wait for publish" , err )
126
+ tkr := time .NewTicker (time .Second )
127
+ for {
128
+ select {
129
+ case <- ctx .Done ():
130
+ t .Fatal ("timed out waiting for pub sub" , ctx .Err ())
131
+ case <- tkr .C :
132
+ t .Logf ("new publish" )
133
+ confirms , err := publisher .PublishWithDeferredConfirmWithContext (ctx , []byte ("example" ), []string {consumerQueue })
134
+ if err != nil {
135
+ // publish should always succeed since we've verified the ping previously
136
+ t .Fatal ("failed to publish" , err )
137
+ }
138
+ for _ , confirm := range confirms {
139
+ if _ , err := confirm .WaitContext (ctx ); err != nil {
140
+ t .Fatal ("failed to wait for publish" , err )
141
+ }
142
+ }
143
+ case d := <- consumed :
144
+ t .Logf ("successfully saw message round trip: '%s'" , string (d .Body ))
145
+ return
125
146
}
126
147
}
127
- t .Logf ("success" )
128
-
129
148
}
0 commit comments