Skip to content

Commit cd74be0

Browse files
authored
Merge pull request #171 from astromechza/test-contrib
chore: initial integration test
2 parents be104ac + 6454487 commit cd74be0

File tree

6 files changed

+192
-2
lines changed

6 files changed

+192
-2
lines changed

.github/workflows/tests.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ jobs:
1010
runs-on: ubuntu-latest
1111
env:
1212
GOPROXY: "https://proxy.golang.org,direct"
13+
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE
1314

1415
steps:
1516
- name: Set up Go
1617
uses: actions/setup-go@v2
1718
with:
18-
go-version: "1.20"
19+
go-version: "1.22"
1920
id: go
2021

2122
- name: Check out code into the Go module directory

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte
119119

120120
Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`.
121121

122+
## Integration testing
123+
124+
By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions.
125+
126+
See [integration_test.go](integration_test.go).
127+
122128
## 💬 Contact
123129

124130
[![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane)

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
module github.com/wagslane/go-rabbitmq
22

3-
go 1.20
3+
go 1.22.6
44

55
require github.com/rabbitmq/amqp091-go v1.10.0

go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
22
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
33
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
4+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

integration_test.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/exec"
8+
"strings"
9+
"testing"
10+
"time"
11+
)
12+
13+
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
14+
15+
func prepareDockerTest(t *testing.T) (connStr string) {
16+
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
17+
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
18+
return
19+
}
20+
ctx, cancel := context.WithCancel(context.Background())
21+
defer cancel()
22+
23+
out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
24+
if err != nil {
25+
t.Log("container id", string(out))
26+
t.Fatalf("error launching rabbitmq in docker: %v", err)
27+
}
28+
t.Cleanup(func() {
29+
containerId := strings.TrimSpace(string(out))
30+
t.Logf("attempting to shutdown container '%s'", containerId)
31+
if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
32+
t.Logf("failed to stop: %v", err)
33+
}
34+
})
35+
return "amqp://guest:guest@localhost:5672/"
36+
}
37+
38+
func waitForHealthyAmqp(t *testing.T, connStr string) *Conn {
39+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
40+
defer cancel()
41+
tkr := time.NewTicker(time.Second)
42+
43+
// only log connection-level logs when connection has succeeded
44+
muted := true
45+
connLogger := simpleLogF(func(s string, i ...interface{}) {
46+
if !muted {
47+
t.Logf(s, i...)
48+
}
49+
})
50+
51+
var lastErr error
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
t.Fatal("timed out waiting for healthy amqp", lastErr)
56+
return nil
57+
case <-tkr.C:
58+
t.Log("attempting connection")
59+
conn, err := NewConn(connStr, WithConnectionOptionsLogger(connLogger))
60+
if err != nil {
61+
lastErr = err
62+
t.Log("connection attempt failed - retrying")
63+
} else {
64+
if err := func() error {
65+
pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
66+
if err != nil {
67+
return fmt.Errorf("failed to setup publisher: %v", err)
68+
}
69+
t.Log("attempting publish")
70+
return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
71+
}(); err != nil {
72+
_ = conn.Close()
73+
t.Log("publish ping failed", err.Error())
74+
} else {
75+
t.Log("ping successful")
76+
muted = true
77+
return conn
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
85+
// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
86+
// to ensure the result is as expected.
87+
func TestSimplePubSub(t *testing.T) {
88+
connStr := prepareDockerTest(t)
89+
conn := waitForHealthyAmqp(t, connStr)
90+
defer conn.Close()
91+
92+
t.Logf("new consumer")
93+
consumerQueue := "my_queue"
94+
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
95+
if err != nil {
96+
t.Fatal("error creating consumer", err)
97+
}
98+
defer consumer.CloseWithContext(context.Background())
99+
100+
// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
101+
// it does not block.
102+
consumed := make(chan Delivery)
103+
defer close(consumed)
104+
105+
go func() {
106+
err = consumer.Run(func(d Delivery) Action {
107+
t.Log("consumed")
108+
select {
109+
case consumed <- d:
110+
default:
111+
}
112+
return Ack
113+
})
114+
if err != nil {
115+
t.Log("consumer run failed", err)
116+
}
117+
}()
118+
119+
// Setup a publisher with notifications enabled
120+
t.Logf("new publisher")
121+
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
122+
if err != nil {
123+
t.Fatal("error creating publisher", err)
124+
}
125+
publisher.NotifyPublish(func(p Confirmation) {
126+
})
127+
defer publisher.Close()
128+
129+
// For test stability we cannot rely on the fact that the consumer go routines are up and running before the
130+
// publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
131+
// pass after we see the first message come through.
132+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
133+
defer cancel()
134+
tkr := time.NewTicker(time.Second)
135+
for {
136+
select {
137+
case <-ctx.Done():
138+
t.Fatal("timed out waiting for pub sub", ctx.Err())
139+
case <-tkr.C:
140+
t.Logf("new publish")
141+
confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue})
142+
if err != nil {
143+
// publish should always succeed since we've verified the ping previously
144+
t.Fatal("failed to publish", err)
145+
}
146+
for _, confirm := range confirms {
147+
if _, err := confirm.WaitContext(ctx); err != nil {
148+
t.Fatal("failed to wait for publish", err)
149+
}
150+
}
151+
case d := <-consumed:
152+
t.Logf("successfully saw message round trip: '%s'", string(d.Body))
153+
return
154+
}
155+
}
156+
}

logger.go

+26
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rabbitmq
33
import (
44
"fmt"
55
"log"
6+
"os"
67

78
"github.com/wagslane/go-rabbitmq/internal/logger"
89
)
@@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) {
3940
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
4041
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
4142
}
43+
44+
// simpleLogF is used to support logging in the test functions.
45+
// This could be exposed publicly for integration in more simple logging interfaces.
46+
type simpleLogF func(string, ...interface{})
47+
48+
func (l simpleLogF) Fatalf(format string, v ...interface{}) {
49+
l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
50+
os.Exit(1)
51+
}
52+
53+
func (l simpleLogF) Errorf(format string, v ...interface{}) {
54+
l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
55+
}
56+
57+
func (l simpleLogF) Warnf(format string, v ...interface{}) {
58+
l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
59+
}
60+
61+
func (l simpleLogF) Infof(format string, v ...interface{}) {
62+
l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
63+
}
64+
65+
func (l simpleLogF) Debugf(format string, v ...interface{}) {
66+
l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
67+
}

0 commit comments

Comments
 (0)