Skip to content

Commit ac5e0a7

Browse files
committed
Add PublishAsync to Publisher for non-blocking message publishing
Introduce PublishAsync(ctx, message, callback) on Publisher as a fire-and-forget alternative to Publish. The send itself is synchronous (SendWithReceipt), but the broker-confirmation wait (SendReceipt.Wait) runs in a background goroutine and delivers its outcome via a PublishAsyncCallback. Back-pressure is enforced through a channel-based semaphore (inFlight) whose capacity is controlled by PublisherOptions.MaxInFlight (default 256): PublishAsync blocks the caller until a slot is available or the context is cancelled. Each confirmation goroutine respects a configurable PublishTimeout (default 30 s) before surfacing a timeout error through the callback. Also add integration tests covering the happy path, bulk sending, MaxInFlight throttling, context-cancellation back-pressure, validation errors, StateReleased outcomes, and custom timeouts, together with a runnable example in docs/examples/publish_async/. Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 5ae63ba commit ac5e0a7

4 files changed

Lines changed: 551 additions & 2 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// The example demonstrates how to use PublishAsync to send messages without blocking
4+
// the caller while waiting for broker confirmation.
5+
//
6+
// Key concepts shown:
7+
// - PublishAsync fires the send immediately and delivers the outcome via a callback.
8+
// - MaxInFlight limits how many confirmation goroutines can run concurrently,
9+
// providing back-pressure: PublishAsync blocks the caller when the limit is reached.
10+
// - PublishTimeout controls how long each confirmation goroutine waits for the broker
11+
// before the callback receives a timeout error.
12+
// - A sync.WaitGroup is used to wait for all callbacks before shutting down.
13+
//
14+
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/publish_async/main.go
15+
16+
package main
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
25+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
26+
)
27+
28+
func main() {
29+
const (
30+
queueName = "publish-async-go-queue"
31+
totalMessages = 2_000_000
32+
maxInFlight = 100
33+
publishTimeout = 10 * time.Second
34+
)
35+
36+
rmq.Info("PublishAsync example starting")
37+
38+
// Counters updated from the callback goroutines.
39+
var accepted, released, rejected, failed atomic.Int32
40+
41+
// Track how long the whole publish loop takes.
42+
startTime := time.Now()
43+
44+
// stateChanged receives connection lifecycle events.
45+
stateChanged := make(chan *rmq.StateChanged, 1)
46+
go func(ch chan *rmq.StateChanged) {
47+
for sc := range ch {
48+
rmq.Info("[connection] status changed", "from", sc.From, "to", sc.To)
49+
}
50+
}(stateChanged)
51+
52+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
53+
amqpConnection, err := env.NewConnection(context.Background())
54+
if err != nil {
55+
rmq.Error("Error opening connection", "error", err)
56+
return
57+
}
58+
amqpConnection.NotifyStatusChange(stateChanged)
59+
rmq.Info("AMQP connection opened")
60+
61+
management := amqpConnection.Management()
62+
queueInfo, err := management.DeclareQueue(context.Background(), &rmq.QuorumQueueSpecification{
63+
Name: queueName,
64+
})
65+
if err != nil {
66+
rmq.Error("Error declaring queue", "error", err)
67+
return
68+
}
69+
70+
// Create the publisher with async-specific options:
71+
// MaxInFlight – at most 100 confirmation goroutines run concurrently.
72+
// When the limit is reached, PublishAsync blocks the caller
73+
// until a slot becomes free.
74+
// PublishTimeout – each confirmation goroutine waits at most 30 s for a
75+
// broker acknowledgement before the callback receives an error.
76+
publisher, err := amqpConnection.NewPublisher(context.Background(),
77+
&rmq.QueueAddress{Queue: queueName},
78+
&rmq.PublisherOptions{
79+
MaxInFlight: maxInFlight,
80+
PublishTimeout: publishTimeout,
81+
})
82+
if err != nil {
83+
rmq.Error("Error creating publisher", "error", err)
84+
return
85+
}
86+
87+
// wg is decremented once per callback invocation.
88+
var wg sync.WaitGroup
89+
wg.Add(totalMessages)
90+
91+
for i := 0; i < totalMessages; i++ {
92+
msgBody := fmt.Sprintf("hello async %d", i)
93+
msg := rmq.NewMessage([]byte(msgBody))
94+
95+
err = publisher.PublishAsync(context.Background(), msg,
96+
func(result *rmq.PublishResult, cbErr error) {
97+
defer wg.Done()
98+
99+
if cbErr != nil {
100+
// Timeout or send-level error.
101+
rmq.Error("[Publisher] async callback error", "error", cbErr)
102+
failed.Add(1)
103+
return
104+
}
105+
106+
switch result.Outcome.(type) {
107+
case *rmq.StateAccepted:
108+
accepted.Add(1)
109+
case *rmq.StateReleased:
110+
rmq.Warn("[Publisher] message not routed", "body", result.Message.Data[0])
111+
released.Add(1)
112+
case *rmq.StateRejected:
113+
s := result.Outcome.(*rmq.StateRejected)
114+
rmq.Warn("[Publisher] message rejected", "error", s.Error)
115+
rejected.Add(1)
116+
}
117+
})
118+
119+
if err != nil {
120+
// PublishAsync only returns an error for validation failures or when the
121+
// caller's context is cancelled while waiting for a free in-flight slot.
122+
rmq.Error("[Publisher] PublishAsync error", "error", err)
123+
wg.Done()
124+
}
125+
}
126+
127+
// Block until every callback has been invoked.
128+
rmq.Info("Waiting for all confirmations…")
129+
wg.Wait()
130+
131+
elapsed := time.Since(startTime)
132+
rmq.Info("All messages confirmed",
133+
"accepted", accepted.Load(),
134+
"released", released.Load(),
135+
"rejected", rejected.Load(),
136+
"failed", failed.Load(),
137+
"elapsed", elapsed.Round(time.Millisecond),
138+
"msg/s", fmt.Sprintf("%.0f", float64(totalMessages)/elapsed.Seconds()),
139+
)
140+
141+
if err = publisher.Close(context.Background()); err != nil {
142+
rmq.Error("Error closing publisher", "error", err)
143+
}
144+
145+
// press any key to close
146+
println("press any key to close and clean up")
147+
148+
var input string
149+
_, _ = fmt.Scanln(&input)
150+
151+
purged, err := management.PurgeQueue(context.Background(), queueInfo.Name())
152+
if err != nil {
153+
rmq.Error("Error purging queue", "error", err)
154+
} else {
155+
rmq.Info("Queue purged", "messages", purged)
156+
}
157+
158+
if err = management.DeleteQueue(context.Background(), queueInfo.Name()); err != nil {
159+
rmq.Error("Error deleting queue", "error", err)
160+
}
161+
162+
if err = env.CloseConnections(context.Background()); err != nil {
163+
rmq.Error("Error closing connection", "error", err)
164+
}
165+
166+
rmq.Info("AMQP connection closed")
167+
time.Sleep(100 * time.Millisecond)
168+
close(stateChanged)
169+
}

pkg/rabbitmqamqp/amqp_publisher.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/url"
77
"strings"
88
"sync/atomic"
9+
"time"
910

1011
"github.com/Azure/go-amqp"
1112
"github.com/google/uuid"
@@ -23,6 +24,8 @@ type Publisher struct {
2324
linkName string
2425
destinationAdd string
2526
id string
27+
inFlight chan struct{}
28+
publishTimeout time.Duration
2629
}
2730

2831
func (m *Publisher) Id() string {
@@ -35,14 +38,27 @@ func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAd
3538
id = options.id()
3639
}
3740

38-
r := &Publisher{connection: connection, linkName: getLinkName(options), destinationAdd: destinationAdd, id: id}
41+
maxInFlight := DefaultMaxInFlight
42+
publishTimeout := DefaultPublishTimeout
43+
if options != nil {
44+
maxInFlight = options.maxInFlight()
45+
publishTimeout = options.publishTimeout()
46+
}
47+
48+
r := &Publisher{
49+
connection: connection,
50+
linkName: getLinkName(options),
51+
destinationAdd: destinationAdd,
52+
id: id,
53+
inFlight: make(chan struct{}, maxInFlight),
54+
publishTimeout: publishTimeout,
55+
}
3956
connection.entitiesTracker.storeOrReplaceProducer(r)
4057
err := r.createSender(ctx)
4158
if err != nil {
4259
return nil, err
4360
}
4461

45-
// Record the publisher opening metric
4662
connection.metricsCollector.OpenPublisher()
4763

4864
return r, nil
@@ -147,6 +163,76 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis
147163
}, err
148164
}
149165

166+
/*
167+
PublishAsync sends a message asynchronously and delivers the result via a callback.
168+
169+
The message is sent to the broker immediately (same as Publish), but the confirmation
170+
wait (SendReceipt.Wait) happens in a background goroutine. When the broker confirms
171+
the message or the timeout expires, the callback is invoked with the result or an error.
172+
173+
Back-pressure: the number of concurrent in-flight confirmations is bounded by
174+
MaxInFlight (configurable via PublisherOptions, default 256). When the limit is
175+
reached, PublishAsync blocks on the caller's goroutine until a slot becomes
176+
available or the context is cancelled.
177+
178+
The timeout for each confirmation wait is controlled by PublisherOptions.PublishTimeout
179+
(default 10s).
180+
*/
181+
func (m *Publisher) PublishAsync(ctx context.Context, message *amqp.Message, callback PublishAsyncCallback) error {
182+
if m.destinationAdd == "" {
183+
if message.Properties == nil || message.Properties.To == nil {
184+
return fmt.Errorf("message properties TO is required to send a message to a dynamic target address")
185+
}
186+
if err := validateAddress(*message.Properties.To); err != nil {
187+
return err
188+
}
189+
}
190+
191+
if message.Header == nil {
192+
message.Header = &amqp.MessageHeader{
193+
Durable: true,
194+
}
195+
}
196+
197+
// Acquire an in-flight slot; blocks if MaxInFlight goroutines are already waiting.
198+
select {
199+
case m.inFlight <- struct{}{}:
200+
case <-ctx.Done():
201+
return ctx.Err()
202+
}
203+
204+
receipt, err := m.sender.Load().SendWithReceipt(ctx, message, nil)
205+
if err != nil {
206+
<-m.inFlight
207+
return err
208+
}
209+
210+
publishCtx := m.buildPublishContext(message)
211+
m.connection.metricsCollector.Publish(publishCtx)
212+
213+
go func() {
214+
defer func() { <-m.inFlight }()
215+
216+
waitCtx, cancel := context.WithTimeout(context.Background(), m.publishTimeout)
217+
defer cancel()
218+
219+
state, waitErr := receipt.Wait(waitCtx)
220+
if waitErr != nil {
221+
callback(nil, fmt.Errorf("publish confirmation failed: %w", waitErr))
222+
return
223+
}
224+
225+
m.recordPublishDisposition(state, publishCtx)
226+
227+
callback(&PublishResult{
228+
Message: message,
229+
Outcome: state,
230+
}, nil)
231+
}()
232+
233+
return nil
234+
}
235+
150236
// recordPublishDisposition records the publish disposition metric based on the delivery state.
151237
func (m *Publisher) recordPublishDisposition(state DeliveryState, ctx PublishContext) {
152238
switch state.(type) {

0 commit comments

Comments
 (0)