diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index baa57c9..8e47620 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -10,12 +10,13 @@ jobs:
runs-on: ubuntu-latest
env:
GOPROXY: "https://proxy.golang.org,direct"
+ ENABLE_DOCKER_INTEGRATION_TESTS: TRUE
steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
- go-version: "1.20"
+ go-version: "1.22"
id: go
- name: Check out code into the Go module directory
diff --git a/README.md b/README.md
index 6e62d94..54230c2 100644
--- a/README.md
+++ b/README.md
@@ -119,6 +119,12 @@ Close your publishers and consumers when you're done with them and do *not* atte
Note that the API is currently in `v0`. I don't plan on huge changes, but there may be some small breaking changes before we hit `v1`.
+## Integration testing
+
+By setting `ENABLE_DOCKER_INTEGRATION_TESTS=TRUE` during `go test -v ./...`, the integration tests will run. These launch a rabbitmq container in the local Docker daemon and test some publish/consume actions.
+
+See [integration_test.go](integration_test.go).
+
## 💬 Contact
[](https://twitter.com/intent/follow?screen_name=wagslane)
diff --git a/go.mod b/go.mod
index ec6f231..060bed5 100644
--- a/go.mod
+++ b/go.mod
@@ -1,5 +1,5 @@
module github.com/wagslane/go-rabbitmq
-go 1.20
+go 1.22.6
require github.com/rabbitmq/amqp091-go v1.10.0
diff --git a/go.sum b/go.sum
index da51250..024eebe 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
diff --git a/integration_test.go b/integration_test.go
new file mode 100644
index 0000000..c6b8014
--- /dev/null
+++ b/integration_test.go
@@ -0,0 +1,156 @@
+package rabbitmq
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+ "testing"
+ "time"
+)
+
+const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
+
+func prepareDockerTest(t *testing.T) (connStr string) {
+ if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
+ t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
+ return
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ out, err := exec.CommandContext(ctx, "docker", "run", "--rm", "--detach", "--publish=5672:5672", "--quiet", "--", "rabbitmq:3-alpine").Output()
+ if err != nil {
+ t.Log("container id", string(out))
+ t.Fatalf("error launching rabbitmq in docker: %v", err)
+ }
+ t.Cleanup(func() {
+ containerId := strings.TrimSpace(string(out))
+ t.Logf("attempting to shutdown container '%s'", containerId)
+ if err := exec.Command("docker", "rm", "--force", containerId).Run(); err != nil {
+ t.Logf("failed to stop: %v", err)
+ }
+ })
+ return "amqp://guest:guest@localhost:5672/"
+}
+
+func waitForHealthyAmqp(t *testing.T, connStr string) *Conn {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ tkr := time.NewTicker(time.Second)
+
+ // only log connection-level logs when connection has succeeded
+ muted := true
+ connLogger := simpleLogF(func(s string, i ...interface{}) {
+ if !muted {
+ t.Logf(s, i...)
+ }
+ })
+
+ var lastErr error
+ for {
+ select {
+ case <-ctx.Done():
+ t.Fatal("timed out waiting for healthy amqp", lastErr)
+ return nil
+ case <-tkr.C:
+ t.Log("attempting connection")
+ conn, err := NewConn(connStr, WithConnectionOptionsLogger(connLogger))
+ if err != nil {
+ lastErr = err
+ t.Log("connection attempt failed - retrying")
+ } else {
+ if err := func() error {
+ pub, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
+ if err != nil {
+ return fmt.Errorf("failed to setup publisher: %v", err)
+ }
+ t.Log("attempting publish")
+ return pub.PublishWithContext(ctx, []byte{}, []string{"ping"}, WithPublishOptionsExchange(""))
+ }(); err != nil {
+ _ = conn.Close()
+ t.Log("publish ping failed", err.Error())
+ } else {
+ t.Log("ping successful")
+ muted = true
+ return conn
+ }
+ }
+ }
+ }
+}
+
+// TestSimplePubSub is an integration testing function that validates whether we can reliably connect to a docker-based
+// rabbitmq and consumer a message that we publish. This uses the default direct exchange with lots of error checking
+// to ensure the result is as expected.
+func TestSimplePubSub(t *testing.T) {
+ connStr := prepareDockerTest(t)
+ conn := waitForHealthyAmqp(t, connStr)
+ defer conn.Close()
+
+ t.Logf("new consumer")
+ consumerQueue := "my_queue"
+ consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
+ if err != nil {
+ t.Fatal("error creating consumer", err)
+ }
+ defer consumer.CloseWithContext(context.Background())
+
+ // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
+ // it does not block.
+ consumed := make(chan Delivery)
+ defer close(consumed)
+
+ go func() {
+ err = consumer.Run(func(d Delivery) Action {
+ t.Log("consumed")
+ select {
+ case consumed <- d:
+ default:
+ }
+ return Ack
+ })
+ if err != nil {
+ t.Log("consumer run failed", err)
+ }
+ }()
+
+ // Setup a publisher with notifications enabled
+ t.Logf("new publisher")
+ publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
+ if err != nil {
+ t.Fatal("error creating publisher", err)
+ }
+ publisher.NotifyPublish(func(p Confirmation) {
+ })
+ defer publisher.Close()
+
+ // For test stability we cannot rely on the fact that the consumer go routines are up and running before the
+ // publisher starts it's first publish attempt. For this reason we run the publisher in a loop every second and
+ // pass after we see the first message come through.
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ tkr := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ctx.Done():
+ t.Fatal("timed out waiting for pub sub", ctx.Err())
+ case <-tkr.C:
+ t.Logf("new publish")
+ confirms, err := publisher.PublishWithDeferredConfirmWithContext(ctx, []byte("example"), []string{consumerQueue})
+ if err != nil {
+ // publish should always succeed since we've verified the ping previously
+ t.Fatal("failed to publish", err)
+ }
+ for _, confirm := range confirms {
+ if _, err := confirm.WaitContext(ctx); err != nil {
+ t.Fatal("failed to wait for publish", err)
+ }
+ }
+ case d := <-consumed:
+ t.Logf("successfully saw message round trip: '%s'", string(d.Body))
+ return
+ }
+ }
+}
diff --git a/logger.go b/logger.go
index 34ef793..5560fa4 100644
--- a/logger.go
+++ b/logger.go
@@ -3,6 +3,7 @@ package rabbitmq
import (
"fmt"
"log"
+ "os"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
@@ -39,3 +40,28 @@ func (l stdDebugLogger) Infof(format string, v ...interface{}) {
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}
+
+// simpleLogF is used to support logging in the test functions.
+// This could be exposed publicly for integration in more simple logging interfaces.
+type simpleLogF func(string, ...interface{})
+
+func (l simpleLogF) Fatalf(format string, v ...interface{}) {
+ l(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
+ os.Exit(1)
+}
+
+func (l simpleLogF) Errorf(format string, v ...interface{}) {
+ l(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
+}
+
+func (l simpleLogF) Warnf(format string, v ...interface{}) {
+ l(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
+}
+
+func (l simpleLogF) Infof(format string, v ...interface{}) {
+ l(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
+}
+
+func (l simpleLogF) Debugf(format string, v ...interface{}) {
+ l(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
+}