Skip to content

Commit 48b36c1

Browse files
committed
chore(ci): added docker based integration test
Signed-off-by: Ben Meier <[email protected]>
1 parent be104ac commit 48b36c1

File tree

4 files changed

+178
-0
lines changed

4 files changed

+178
-0
lines changed

.github/workflows/tests.yml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ jobs:
1010
runs-on: ubuntu-latest
1111
env:
1212
GOPROXY: "https://proxy.golang.org,direct"
13+
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE
1314

1415
steps:
1516
- name: Set up Go

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte
119119

120120
Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`.
121121

122+
## Integration testing
123+
124+
By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions.
125+
126+
See [integration_test.go](integration_test.go).
127+
122128
## 💬 Contact
123129

124130
[![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane)

integration_test.go

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/exec"
8+
"strings"
9+
"testing"
10+
"time"
11+
)
12+
13+
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
14+
15+
func prepareDockerTest(t *testing.T) (connStr string) {
16+
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
17+
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
18+
return
19+
}
20+
ctx, cancel := context.WithCancel(context.Background())
21+
defer cancel()
22+
23+
out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
24+
if err != nil {
25+
t.Log("container id", string(out))
26+
t.Fatalf("error launching rabbitmq in docker: %v", err)
27+
}
28+
t.Cleanup(func() {
29+
containerId := strings.TrimSpace(string(out))
30+
t.Logf("attempting to shutdown container '%s'", containerId)
31+
if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
32+
t.Logf("failed to stop: %v", err)
33+
}
34+
})
35+
return "amqp://guest:guest@localhost:5672/"
36+
}
37+
38+
func waitForHealthyAmqp(t *testing.T, connStr string) *Conn {
39+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
40+
defer cancel()
41+
tkr := time.NewTicker(time.Second)
42+
43+
for {
44+
select {
45+
case <-ctx.Done():
46+
t.Fatal("timed out waiting for healthy amqp", ctx.Err())
47+
return nil
48+
case <-tkr.C:
49+
t.Log("attempting connection")
50+
conn, err := NewConn(connStr, WithConnectionOptionsLogger(simpleLogF(t.Logf)))
51+
if err != nil {
52+
t.Log("failed to connect", err.Error())
53+
} else {
54+
if err := func() error {
55+
pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
56+
if err != nil {
57+
return fmt.Errorf("failed to setup publisher: %v", err)
58+
}
59+
t.Log("attempting publish")
60+
return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
61+
}(); err != nil {
62+
_ = conn.Close()
63+
t.Log("publish ping failed", err.Error())
64+
} else {
65+
t.Log("ping successful")
66+
return conn
67+
}
68+
}
69+
}
70+
}
71+
}
72+
73+
// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
74+
// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
75+
// to ensure the result is as expected.
76+
func TestSimplePubSub(t *testing.T) {
77+
connStr := prepareDockerTest(t)
78+
conn := waitForHealthyAmqp(t, connStr)
79+
defer conn.Close()
80+
81+
t.Logf("new consumer")
82+
consumerQueue := "my_queue"
83+
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
84+
if err != nil {
85+
t.Fatal("error creating consumer", err)
86+
}
87+
defer consumer.CloseWithContext(context.Background())
88+
89+
// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
90+
// it does not block.
91+
consumed := make(chan Delivery)
92+
defer close(consumed)
93+
94+
go func() {
95+
err = consumer.Run(func(d Delivery) Action {
96+
t.Log("consumed")
97+
select {
98+
case consumed <- d:
99+
default:
100+
}
101+
return Ack
102+
})
103+
if err != nil {
104+
t.Log("consumer run failed", err)
105+
}
106+
}()
107+
108+
// Setup a publisher with notifications enabled
109+
t.Logf("new publisher")
110+
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
111+
if err != nil {
112+
t.Fatal("error creating publisher", err)
113+
}
114+
publisher.NotifyPublish(func(p Confirmation) {
115+
})
116+
defer publisher.Close()
117+
118+
// For test stability we cannot rely on the fact that the consumer go routines are up and running before the
119+
// publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
120+
// pass after we see the first message come through.
121+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
122+
defer cancel()
123+
tkr := time.NewTicker(time.Second)
124+
for {
125+
select {
126+
case <-ctx.Done():
127+
t.Fatal("timed out waiting for pub sub", ctx.Err())
128+
case <-tkr.C:
129+
t.Logf("new publish")
130+
confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue})
131+
if err != nil {
132+
// publish should always succeed since we've verified the ping previously
133+
t.Fatal("failed to publish", err)
134+
}
135+
for _, confirm := range confirms {
136+
if _, err := confirm.WaitContext(ctx); err != nil {
137+
t.Fatal("failed to wait for publish", err)
138+
}
139+
}
140+
case d := <-consumed:
141+
t.Logf("successfully saw message round trip: '%s'", string(d.Body))
142+
return
143+
}
144+
}
145+
}

logger.go

+26
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rabbitmq
33
import (
44
"fmt"
55
"log"
6+
"os"
67

78
"github.com/wagslane/go-rabbitmq/internal/logger"
89
)
@@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) {
3940
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
4041
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
4142
}
43+
44+
// simpleLogF is used to support logging in the test functions.
45+
// This could be exposed publicly for integration in more simple logging interfaces.
46+
type simpleLogF func(string, ...interface{})
47+
48+
func (l simpleLogF) Fatalf(format string, v ...interface{}) {
49+
l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
50+
os.Exit(1)
51+
}
52+
53+
func (l simpleLogF) Errorf(format string, v ...interface{}) {
54+
l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
55+
}
56+
57+
func (l simpleLogF) Warnf(format string, v ...interface{}) {
58+
l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
59+
}
60+
61+
func (l simpleLogF) Infof(format string, v ...interface{}) {
62+
l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
63+
}
64+
65+
func (l simpleLogF) Debugf(format string, v ...interface{}) {
66+
l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
67+
}

0 commit comments

Comments
 (0)