Add PublishAsync to Publisher for non-blocking message publishing#93
Add PublishAsync to Publisher for non-blocking message publishing#93Gsantomaggio merged 8 commits intomainfrom
Conversation
Introduce PublishAsync(ctx, message, callback) on Publisher as a fire-and-forget alternative to Publish. The send itself is synchronous (SendWithReceipt), but the broker-confirmation wait (SendReceipt.Wait) runs in a background goroutine and delivers its outcome via a PublishAsyncCallback. Back-pressure is enforced through a channel-based semaphore (inFlight) whose capacity is controlled by PublisherOptions.MaxInFlight (default 256): PublishAsync blocks the caller until a slot is available or the context is cancelled. Each confirmation goroutine respects a configurable PublishTimeout (default 30 s) before surfacing a timeout error through the callback. Also add integration tests covering the happy path, bulk sending, MaxInFlight throttling, context-cancellation back-pressure, validation errors, StateReleased outcomes, and custom timeouts, together with a runnable example in docs/examples/publish_async/. Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a new PublishAsync(ctx, message, callback) API to Publisher to allow non-blocking publish confirmations with bounded in-flight concurrency and a configurable confirmation timeout, plus supporting docs/examples and test coverage.
Changes:
- Introduce
PublishAsyncwithMaxInFlightback-pressure (semaphore) andPublishTimeoutconfirmation wait timeout. - Extend publisher options/types with defaults and callback type, plus lifecycle doc improvements.
- Add integration tests and runnable docs utilities (example + perf harness), and wire vet/staticcheck for the new docs code.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/rabbitmqamqp/life_cycle.go | Moves lifecycle state constants earlier and adds extended StateChanged documentation. |
| pkg/rabbitmqamqp/amqp_types.go | Adds publisher async options defaults + PublishAsyncCallback and option accessors. |
| pkg/rabbitmqamqp/amqp_publisher.go | Implements PublishAsync with in-flight semaphore and timeout-based receipt waits. |
| pkg/rabbitmqamqp/amqp_publisher_test.go | Adds integration tests covering async publish behavior and throttling/cancellation. |
| docs/perf_test/main.go | Adds a perf harness supporting both Publish and PublishAsync. |
| docs/examples/publish_async/main.go | Adds a runnable example demonstrating PublishAsync usage and tuning. |
| README.md | Documents the new perf test harness and the two publish modes. |
| Makefile | Includes docs/perf_test in vet/staticcheck. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err = env.CloseConnections(context.Background()); err != nil { | ||
| fmt.Fprintf(os.Stderr, "%sclose:%s %v\n", colorRed, colorReset, err) | ||
| } | ||
| close(stateChanged) |
There was a problem hiding this comment.
stateChanged is passed to NotifyStatusChange, and the library likely sends to that channel. Closing a channel from the receiver side can panic if the sender attempts to write after close (including during/after connection shutdown). Don’t close stateChanged here; instead, signal your logging goroutine to stop via a separate done channel/context, or simply let the process exit without closing it.
| close(stateChanged) |
|
|
||
| rmq.Info("AMQP connection closed") | ||
| time.Sleep(100 * time.Millisecond) | ||
| close(stateChanged) |
There was a problem hiding this comment.
Same receiver-side channel close issue as in docs/perf_test: NotifyStatusChange likely writes to stateChanged, and closing it here can trigger a send-on-closed-channel panic. Avoid closing this channel from the example; stop the logger goroutine via context/done signaling instead.
| close(stateChanged) | |
| // Do not close stateChanged here: notification senders may still write to it. |
| const ( | ||
| queueName = "publish-async-go-queue" | ||
| totalMessages = 10_000_000 | ||
| maxInFlight = 100 | ||
| publishTimeout = 10 * time.Second | ||
| ) |
There was a problem hiding this comment.
totalMessages = 10_000_000 makes this “example” effectively a long-running load test (and WaitGroup.Add(10_000_000) can be heavy). Consider reducing this to a smaller, runnable-by-default count (e.g., 1k–10k) and/or making it configurable via flags/env so the example is practical for quick verification.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Introduce PublishAsync(ctx, message, callback) on Publisher as a fire-and-forget alternative to Publish. The send itself is synchronous (SendWithReceipt), but the broker-confirmation wait (SendReceipt.Wait) runs in a background goroutine and delivers its outcome via a PublishAsyncCallback.
Back-pressure is enforced through a channel-based semaphore (inFlight) whose capacity is controlled by PublisherOptions.MaxInFlight (default 256): PublishAsync blocks the caller until a slot is available or the context is cancelled. Each confirmation goroutine respects a configurable PublishTimeout (default 10 s) before surfacing a timeout error through the callback.
Also add integration tests covering the happy path, bulk sending, MaxInFlight throttling, context-cancellation back-pressure, validation errors, StateReleased outcomes, and custom timeouts, together with a runnable example in docs/examples/publish_async/.