Skip to content

Commit 6259533

Browse files
Add PublishAsync to Publisher for non-blocking message publishing (#93)
* Add PublishAsync to Publisher for non-blocking message publishing Introduce PublishAsync(ctx, message, callback) on Publisher as a fire-and-forget alternative to Publish. The send itself is synchronous (SendWithReceipt), but the broker-confirmation wait (SendReceipt.Wait) runs in a background goroutine and delivers its outcome via a PublishAsyncCallback. Back-pressure is enforced through a channel-based semaphore (inFlight) whose capacity is controlled by PublisherOptions.MaxInFlight (default 256): PublishAsync blocks the caller until a slot is available or the context is cancelled. Each confirmation goroutine respects a configurable PublishTimeout (default 30 s) before surfacing a timeout error through the callback. Also add integration tests covering the happy path, bulk sending, MaxInFlight throttling, context-cancellation back-pressure, validation errors, StateReleased outcomes, and custom timeouts, together with a runnable example in docs/examples/publish_async/. --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5ae63ba commit 6259533

8 files changed

Lines changed: 931 additions & 14 deletions

File tree

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ format:
1313
vet:
1414
go vet ./pkg/rabbitmqamqp
1515
go vet ./docs/examples/...
16+
go vet ./docs/perf_test/...
17+
1618

1719
STATICCHECK ?= $(GOBIN)/staticcheck
1820
STATICCHECK_VERSION ?= latest
@@ -21,6 +23,7 @@ $(STATICCHECK):
2123
check: $(STATICCHECK)
2224
$(STATICCHECK) ./pkg/rabbitmqamqp
2325
$(STATICCHECK) ./docs/examples/...
26+
$(STATICCHECK) ./docs/perf_test/...
2427

2528

2629

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,15 @@ This library is meant to be used with RabbitMQ `4.x`.
1111
- Getting started Video tutorial: </br>
1212
[![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI)
1313

14-
14+
## Performance test
15+
16+
You can find a performance test in `docs/perf_test/`.
17+
This client supports two ways to publish messages:
18+
- With the `Publish` method, which is a simple way to publish messages.
19+
- With the `PublishAsync` method, which allows you to publish messages asynchronously
20+
and get a confirmation when the message is published. This is useful for high throughput scenarios.
21+
`PublishAsync` can be tuned with the `PublisherOptions` to achieve better performance.
22+
Note: With a large MaxInFlight the library can use memory since it tracks all the messages in flight.
1523

1624
## Documentation
1725

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// The example demonstrates how to use PublishAsync to send messages without blocking
4+
// the caller while waiting for broker confirmation.
5+
//
6+
// Key concepts shown:
7+
// - PublishAsync fires the send immediately and delivers the outcome via a callback.
8+
// - MaxInFlight limits how many confirmation goroutines can run concurrently,
9+
// providing back-pressure: PublishAsync blocks the caller when the limit is reached.
10+
// - PublishTimeout controls how long each confirmation goroutine waits for the broker
11+
// before the callback receives a timeout error.
12+
// - A sync.WaitGroup is used to wait for all callbacks before shutting down.
13+
//
14+
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/publish_async/main.go
15+
16+
package main
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
26+
)
27+
28+
func main() {
29+
const (
30+
queueName = "publish-async-go-queue"
31+
totalMessages = 10_000_000
32+
maxInFlight = 100
33+
publishTimeout = 10 * time.Second
34+
)
35+
36+
rmq.Info("PublishAsync example starting")
37+
38+
// Counters updated from the callback goroutines.
39+
var accepted, released, rejected, failed atomic.Int32
40+
41+
// Track how long the whole publish loop takes.
42+
startTime := time.Now()
43+
44+
// stateChanged receives connection lifecycle events.
45+
stateChanged := make(chan *rmq.StateChanged, 1)
46+
go func(ch chan *rmq.StateChanged) {
47+
for sc := range ch {
48+
rmq.Info("[connection] status changed", "from", sc.From, "to", sc.To)
49+
}
50+
}(stateChanged)
51+
52+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
53+
amqpConnection, err := env.NewConnection(context.Background())
54+
if err != nil {
55+
rmq.Error("Error opening connection", "error", err)
56+
return
57+
}
58+
amqpConnection.NotifyStatusChange(stateChanged)
59+
rmq.Info("AMQP connection opened")
60+
61+
management := amqpConnection.Management()
62+
queueInfo, err := management.DeclareQueue(context.Background(), &rmq.QuorumQueueSpecification{
63+
Name: queueName,
64+
})
65+
if err != nil {
66+
rmq.Error("Error declaring queue", "error", err)
67+
return
68+
}
69+
70+
// Create the publisher with async-specific options:
71+
// MaxInFlight – at most 100 confirmation goroutines run concurrently.
72+
// When the limit is reached, PublishAsync blocks the caller
73+
// until a slot becomes free.
74+
// PublishTimeout – each confirmation goroutine waits at most 30 s for a
75+
// broker acknowledgement before the callback receives an error.
76+
publisher, err := amqpConnection.NewPublisher(context.Background(),
77+
&rmq.QueueAddress{Queue: queueName},
78+
&rmq.PublisherOptions{
79+
MaxInFlight: maxInFlight,
80+
PublishTimeout: publishTimeout,
81+
})
82+
if err != nil {
83+
rmq.Error("Error creating publisher", "error", err)
84+
return
85+
}
86+
87+
// wg is decremented once per callback invocation.
88+
var wg sync.WaitGroup
89+
wg.Add(totalMessages)
90+
91+
for i := 0; i < totalMessages; i++ {
92+
msgBody := fmt.Sprintf("hello async %d", i)
93+
msg := rmq.NewMessage([]byte(msgBody))
94+
95+
err = publisher.PublishAsync(context.Background(), msg,
96+
func(result *rmq.PublishResult, cbErr error) {
97+
defer wg.Done()
98+
99+
if cbErr != nil {
100+
// Timeout or send-level error.
101+
rmq.Error("[Publisher] async callback error", "error", cbErr)
102+
failed.Add(1)
103+
return
104+
}
105+
106+
switch result.Outcome.(type) {
107+
case *rmq.StateAccepted:
108+
accepted.Add(1)
109+
case *rmq.StateReleased:
110+
rmq.Warn("[Publisher] message not routed", "body", result.Message.Data[0])
111+
released.Add(1)
112+
case *rmq.StateRejected:
113+
s := result.Outcome.(*rmq.StateRejected)
114+
rmq.Warn("[Publisher] message rejected", "error", s.Error)
115+
rejected.Add(1)
116+
}
117+
})
118+
119+
if err != nil {
120+
// PublishAsync only returns an error for validation failures or when the
121+
// caller's context is cancelled while waiting for a free in-flight slot.
122+
rmq.Error("[Publisher] PublishAsync error", "error", err)
123+
wg.Done()
124+
}
125+
}
126+
127+
// Block until every callback has been invoked.
128+
rmq.Info("Waiting for all confirmations…")
129+
wg.Wait()
130+
131+
elapsed := time.Since(startTime)
132+
rmq.Info("All messages confirmed",
133+
"accepted", accepted.Load(),
134+
"released", released.Load(),
135+
"rejected", rejected.Load(),
136+
"failed", failed.Load(),
137+
"elapsed", elapsed.Round(time.Millisecond),
138+
"msg/s", fmt.Sprintf("%.0f", float64(totalMessages)/elapsed.Seconds()),
139+
)
140+
141+
if err = publisher.Close(context.Background()); err != nil {
142+
rmq.Error("Error closing publisher", "error", err)
143+
}
144+
145+
// press any key to close
146+
println("press any key to close and clean up")
147+
148+
var input string
149+
_, _ = fmt.Scanln(&input)
150+
151+
purged, err := management.PurgeQueue(context.Background(), queueInfo.Name())
152+
if err != nil {
153+
rmq.Error("Error purging queue", "error", err)
154+
} else {
155+
rmq.Info("Queue purged", "messages", purged)
156+
}
157+
158+
if err = management.DeleteQueue(context.Background(), queueInfo.Name()); err != nil {
159+
rmq.Error("Error deleting queue", "error", err)
160+
}
161+
162+
if err = env.CloseConnections(context.Background()); err != nil {
163+
rmq.Error("Error closing connection", "error", err)
164+
}
165+
166+
rmq.Info("AMQP connection closed")
167+
time.Sleep(100 * time.Millisecond)
168+
close(stateChanged)
169+
}

0 commit comments

Comments
 (0)