Skip to content

Commit 5cf4129

Browse files
committed
chore: initial integration test
1 parent ef6a380 commit 5cf4129

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed

integration_test.go

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/exec"
9+
"strings"
10+
"testing"
11+
"time"
12+
)
13+
14+
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
15+
16+
func prepareDockerTest(t *testing.T) (connStr string) {
17+
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); ok && strings.ToUpper(v) != "TRUE" {
18+
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
19+
return
20+
}
21+
ctx, cancel := context.WithCancel(context.Background())
22+
defer cancel()
23+
24+
out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
25+
if err != nil {
26+
t.Log("container id", string(out))
27+
t.Fatalf("error launching rabbitmq in docker: %v", err)
28+
}
29+
t.Cleanup(func() {
30+
t.Log("hi")
31+
containerId := strings.TrimSpace(string(out))
32+
t.Logf("attempting to shutdown container '%s'", containerId)
33+
if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
34+
t.Logf("failed to stop: %v", err)
35+
}
36+
})
37+
return "amqp://guest:guest@localhost:5672/"
38+
}
39+
40+
func waitForHealthyAmqp(t *testing.T, connStr string) {
41+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
42+
defer cancel()
43+
tkr := time.NewTicker(time.Second)
44+
45+
for {
46+
select {
47+
case <-ctx.Done():
48+
t.Fatal("timed out waiting for healthy amqp", ctx.Err())
49+
case <-tkr.C:
50+
if err := func() error {
51+
t.Log("attempting connection")
52+
conn, err := NewConn(connStr)
53+
if err != nil {
54+
return fmt.Errorf("failed to setup connection: %v", err)
55+
}
56+
defer conn.Close()
57+
58+
pub, err := NewPublisher(conn)
59+
if err != nil {
60+
return fmt.Errorf("failed to setup publisher: %v", err)
61+
}
62+
63+
t.Log("attempting publish")
64+
return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
65+
}(); err != nil {
66+
t.Log("publish ping failed", err.Error())
67+
} else {
68+
t.Log("ping successful")
69+
return
70+
}
71+
}
72+
}
73+
}
74+
75+
func TestSimplePubSub(t *testing.T) {
76+
connStr := prepareDockerTest(t)
77+
waitForHealthyAmqp(t, connStr)
78+
79+
conn, err := NewConn(connStr)
80+
if err != nil {
81+
t.Fatal("error creating connection", err)
82+
}
83+
defer conn.Close()
84+
85+
t.Logf("new consumer")
86+
consumer, err := NewConsumer(conn, "my_queue")
87+
if err != nil {
88+
t.Fatal("error creating consumer", err)
89+
}
90+
defer consumer.Close()
91+
92+
go func() {
93+
err = consumer.Run(func(d Delivery) Action {
94+
log.Printf("consumed: %v", string(d.Body))
95+
return Ack
96+
})
97+
if err != nil {
98+
t.Log("consumer run failed", err)
99+
}
100+
}()
101+
102+
t.Logf("new publisher")
103+
publisher, err := NewPublisher(conn)
104+
if err != nil {
105+
t.Fatal("error creating publisher", err)
106+
}
107+
publisher.NotifyPublish(func(p Confirmation) {
108+
return
109+
})
110+
111+
ctx, cancel := context.WithCancel(context.Background())
112+
defer cancel()
113+
114+
t.Logf("new publish")
115+
confirms, err := publisher.PublishWithDeferredConfirmWithContext(
116+
ctx, []byte("example"), []string{"my_queue"},
117+
WithPublishOptionsMandatory,
118+
)
119+
if err != nil {
120+
t.Fatal("failed to publish", err)
121+
}
122+
for _, confirm := range confirms {
123+
if _, err := confirm.WaitContext(ctx); err != nil {
124+
t.Fatal("failed to wait for publish", err)
125+
}
126+
}
127+
t.Logf("success")
128+
129+
}

0 commit comments

Comments
 (0)