Add DelayedQueueSpecification for Tanzu delayed queues#92
Conversation
Introduce the delayed queue type and specification (standard queue args plus optional automatic shovel fields with documented defaults). Extend topology recovery and unit tests, add an integration test that rejects declaration on non-Tanzu brokers, document the feature in AGENTS.md, and add a docs/examples/delayed_queue sample with README entry. Made-with: Cursor
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
There was a problem hiding this comment.
Pull request overview
Adds support for Tanzu-specific delayed queues to the Go AMQP 1.0 client by introducing a new queue type/spec and wiring it into topology recovery, tests, and documentation/examples.
Changes:
- Add
Delayedqueue type andDelayedQueueSpecification(including optional automatic-shovel arguments and defaults). - Extend topology recovery to reconstruct delayed queue specs, plus unit/integration tests around argument mapping and non-Tanzu rejection.
- Add a new
docs/examples/delayed_queuesample and update docs/AGENTS documentation to reference the new spec.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/rabbitmqamqp/entities.go | Introduces Delayed queue type, delayed queue spec, argument building, and server capability validation. |
| pkg/rabbitmqamqp/entities_test.go | Adds unit tests for delayed queue argument mapping and shovel defaults. |
| pkg/rabbitmqamqp/amqp_queue_test.go | Adds integration test ensuring delayed queue declaration is rejected on non-Tanzu brokers. |
| pkg/rabbitmqamqp/amqp_connection_recovery.go | Adds delayed queue handling when converting recovery records back to queue specs. |
| pkg/rabbitmqamqp/amqp_connection_recovery_test.go | Updates recovery table tests to cover delayed queue reconstruction. |
| docs/examples/README.md | Documents the new delayed queue example (and updates JMS wording). |
| docs/examples/delayed_queue/main.go | New runnable example demonstrating delayed queue declaration and delayed delivery via annotations. |
| AGENTS.md | Documents the new queue type and queue specification. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (d *DelayedQueueSpecification) validate(f *featuresAvailable) error { | ||
| if f.isTanzu { | ||
| return nil | ||
| } | ||
| return fmt.Errorf("DelayedQueueSpecification is only supported on Tanzu RabbitMQ 4.3 or later") | ||
| } |
There was a problem hiding this comment.
DelayedQueueSpecification.validate currently checks only f.isTanzu but the error message (and PR description/docs) state the feature requires Tanzu RabbitMQ 4.3+. This means older Tanzu versions would be treated as supported. Consider aligning this with JMSQueueSpecification by also checking the server version flag (e.g., f.is43rMore) or updating the message/docs if 4.3+ is not actually required.
| - [Getting Started](getting_started) - A simple example to get you started. | ||
| - [JMS queue](jms_queue) - Same flow as getting started, using `JmsQueueSpecification` (Tanzu RabbitMQ 4.x). | ||
| - [JMS queue](jms_queue) - Same flow as getting started, using `JmsQueueSpecification` (Tanzu RabbitMQ 4.3+). | ||
| - [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.x). |
There was a problem hiding this comment.
The Delayed queue entry says Tanzu RabbitMQ 4.x, while other documentation in this PR (AGENTS.md and the runtime validation error) says 4.3+. Please make the version requirement consistent across docs and code so users don’t get conflicting guidance.
| - [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.x). | |
| - [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.3+). |
| // (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.x+; see: | ||
| // https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-2/tanzu-rabbitmq-kubernetes/delayed-queues.html |
There was a problem hiding this comment.
This example header says delayed queues are available on Tanzu RabbitMQ 4.x+ and links to the 4-2 docs, but the client validation/error message says 4.3+. Please align the stated minimum version and the linked docs version with the actual requirement enforced by DelayedQueueSpecification.validate.
| // (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.x+; see: | |
| // https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-2/tanzu-rabbitmq-kubernetes/delayed-queues.html | |
| // (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.3+; see: | |
| // https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-3/tanzu-rabbitmq-kubernetes/delayed-queues.html |
| It("should fail if declare a Delayed queue in the open source RabbitMQ", func() { | ||
| queueName := generateName("should fail if declare a Delayed queue in the open source RabbitMQ") | ||
| _, err := management.DeclareQueue(context.TODO(), &DelayedQueueSpecification{ | ||
| Name: queueName, | ||
| }) | ||
| Expect(err).NotTo(BeNil()) | ||
| Expect(err.Error()).To(ContainSubstring("DelayedQueueSpecification is only supported on Tanzu RabbitMQ 4.3 or later")) |
There was a problem hiding this comment.
Test name grammar: "should fail if declare a Delayed queue..." reads awkwardly. Consider rewording to "should fail when declaring a delayed queue on open-source RabbitMQ" (also keeps the queue type consistently lowercased in prose).
| Entry("Classic queue", Classic, true, true, map[string]any{}), | ||
| Entry("Stream queue", Stream, false, false, map[string]any{}), | ||
| //Entry("JMS queue", Jms, false, false, map[string]any{}), | ||
| //Entry("Delayed queue", Delayed, false, false, map[string]any{}), | ||
| Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}), | ||
| Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}), | ||
| Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}), | ||
| //Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}), | ||
| Entry("Delayed queue with arguments", Delayed, false, false, map[string]any{"x-max-length-bytes": 1000}), |
There was a problem hiding this comment.
The topology recovery table adds a Delayed queue entry with arguments but leaves the no-arguments Delayed entry commented out. Re-enabling the basic Delayed case would keep coverage consistent with the other queue types and ensure recovery works when arguments are empty.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Introduce the delayed queue type and specification (standard queue args plus optional automatic shovel fields with documented defaults). Extend topology recovery and unit tests, add an integration test that rejects declaration on non-Tanzu brokers, document the feature in AGENTS.md, and add a docs/examples/delayed_queue sample with README entry.
part of #90