@@ -14,7 +14,7 @@ import (
14
14
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
15
15
16
16
func prepareDockerTest (t * testing.T ) (connStr string ) {
17
- if v , ok := os .LookupEnv (enableDockerIntegrationTestsFlag ); ok && strings .ToUpper (v ) != "TRUE" {
17
+ if v , ok := os .LookupEnv (enableDockerIntegrationTestsFlag ); ! ok || strings .ToUpper (v ) != "TRUE" {
18
18
t .Skipf ("integration tests are only run if '%s' is TRUE" , enableDockerIntegrationTestsFlag )
19
19
return
20
20
}
@@ -27,7 +27,6 @@ func prepareDockerTest(t *testing.T) (connStr string) {
27
27
t .Fatalf ("error launching rabbitmq in docker: %v" , err )
28
28
}
29
29
t .Cleanup (func () {
30
- t .Log ("hi" )
31
30
containerId := strings .TrimSpace (string (out ))
32
31
t .Logf ("attempting to shutdown container '%s'" , containerId )
33
32
if err := exec .Command ("docker" , "rm" , "--force" , containerId ).Run (); err != nil {
@@ -37,7 +36,7 @@ func prepareDockerTest(t *testing.T) (connStr string) {
37
36
return "amqp://guest:guest@localhost:5672/"
38
37
}
39
38
40
- func waitForHealthyAmqp (t * testing.T , connStr string ) {
39
+ func waitForHealthyAmqp (t * testing.T , connStr string ) * Conn {
41
40
ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
42
41
defer cancel ()
43
42
tkr := time .NewTicker (time .Second )
@@ -47,58 +46,66 @@ func waitForHealthyAmqp(t *testing.T, connStr string) {
47
46
case <- ctx .Done ():
48
47
t .Fatal ("timed out waiting for healthy amqp" , ctx .Err ())
49
48
case <- tkr .C :
50
- if err := func () error {
51
- t .Log ("attempting connection" )
52
- conn , err := NewConn (connStr )
53
- if err != nil {
54
- return fmt .Errorf ("failed to setup connection: %v" , err )
55
- }
56
- defer conn .Close ()
57
-
58
- pub , err := NewPublisher (conn )
59
- if err != nil {
60
- return fmt .Errorf ("failed to setup publisher: %v" , err )
61
- }
62
-
63
- t .Log ("attempting publish" )
64
- return pub .PublishWithContext (ctx , []byte {}, []string {"ping" }, WithPublishOptionsExchange ("" ))
65
- }(); err != nil {
66
- t .Log ("publish ping failed" , err .Error ())
49
+ conn , err := NewConn (connStr )
50
+ if err != nil {
51
+ t .Log ("failed to connect" , err .Error ())
67
52
} else {
68
- t .Log ("ping successful" )
69
- return
53
+ if err := func () error {
54
+ t .Log ("attempting connection" )
55
+
56
+ pub , err := NewPublisher (conn )
57
+ if err != nil {
58
+ return fmt .Errorf ("failed to setup publisher: %v" , err )
59
+ }
60
+
61
+ t .Log ("attempting publish" )
62
+ return pub .PublishWithContext (ctx , []byte {}, []string {"ping" }, WithPublishOptionsExchange ("" ))
63
+ }(); err != nil {
64
+ _ = conn .Close ()
65
+ t .Log ("publish ping failed" , err .Error ())
66
+ } else {
67
+ t .Log ("ping successful" )
68
+ return conn
69
+ }
70
70
}
71
71
}
72
72
}
73
+ return nil
73
74
}
74
75
75
76
func TestSimplePubSub (t * testing.T ) {
76
77
connStr := prepareDockerTest (t )
77
- waitForHealthyAmqp (t , connStr )
78
-
79
- conn , err := NewConn (connStr )
80
- if err != nil {
81
- t .Fatal ("error creating connection" , err )
82
- }
78
+ conn := waitForHealthyAmqp (t , connStr )
83
79
defer conn .Close ()
84
80
85
81
t .Logf ("new consumer" )
86
- consumer , err := NewConsumer (conn , "my_queue" )
82
+ consumerQueue := "my_queue"
83
+ consumer , err := NewConsumer (conn , consumerQueue )
87
84
if err != nil {
88
85
t .Fatal ("error creating consumer" , err )
89
86
}
90
- defer consumer .Close ()
87
+ defer consumer .CloseWithContext (context .Background ())
88
+
89
+ // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
90
+ // it does not block.
91
+ consumed := make (chan string )
92
+ defer close (consumed )
91
93
92
94
go func () {
93
95
err = consumer .Run (func (d Delivery ) Action {
94
- log .Printf ("consumed: %v" , string (d .Body ))
96
+ log .Printf ("consumed" )
97
+ select {
98
+ case consumed <- string (d .Body ):
99
+ default :
100
+ }
95
101
return Ack
96
102
})
97
103
if err != nil {
98
104
t .Log ("consumer run failed" , err )
99
105
}
100
106
}()
101
107
108
+ // Setup a publisher with notifications enabled
102
109
t .Logf ("new publisher" )
103
110
publisher , err := NewPublisher (conn )
104
111
if err != nil {
@@ -107,23 +114,33 @@ func TestSimplePubSub(t *testing.T) {
107
114
publisher .NotifyPublish (func (p Confirmation ) {
108
115
return
109
116
})
117
+ defer publisher .Close ()
110
118
111
- ctx , cancel := context .WithCancel (context .Background ())
119
+ // For test stability we cannot rely on the fact that the consumer go routines are up and running before the
120
+ // publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
121
+ // pass after we see the first message come through.
122
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
112
123
defer cancel ()
113
-
114
- t .Logf ("new publish" )
115
- confirms , err := publisher .PublishWithDeferredConfirmWithContext (
116
- ctx , []byte ("example" ), []string {"my_queue" },
117
- WithPublishOptionsMandatory ,
118
- )
119
- if err != nil {
120
- t .Fatal ("failed to publish" , err )
121
- }
122
- for _ , confirm := range confirms {
123
- if _ , err := confirm .WaitContext (ctx ); err != nil {
124
- t .Fatal ("failed to wait for publish" , err )
124
+ tkr := time .NewTicker (time .Second )
125
+ for {
126
+ select {
127
+ case <- ctx .Done ():
128
+ t .Fatal ("timed out waiting for pub sub" , ctx .Err ())
129
+ case <- tkr .C :
130
+ t .Logf ("new publish" )
131
+ confirms , err := publisher .PublishWithDeferredConfirmWithContext (ctx , []byte ("example" ), []string {consumerQueue })
132
+ if err != nil {
133
+ // publish should always succeed since we've verified the ping previously
134
+ t .Fatal ("failed to publish" , err )
135
+ }
136
+ for _ , confirm := range confirms {
137
+ if _ , err := confirm .WaitContext (ctx ); err != nil {
138
+ t .Fatal ("failed to wait for publish" , err )
139
+ }
140
+ }
141
+ case msg := <- consumed :
142
+ t .Logf ("successfully saw message round trip: '%s'" , msg )
143
+ return
125
144
}
126
145
}
127
- t .Logf ("success" )
128
-
129
146
}
0 commit comments