Skip to content

Commit 7a517bc

Browse files
Add settings to the stream queues (#87)
Adds additional configuration knobs for RabbitMQ stream queues by extending `StreamQueueSpecification` and introducing a duration-to-`x-max-age` formatter, with accompanying unit tests. **Changes:** - Extend `StreamQueueSpecification` to support `x-stream-filter-size-bytes`, `x-max-age` and `x-stream-max-segment-size-bytes`. - Add `durationToMaxAge(time.Duration)` helper to format RabbitMQ `x-max-age` values. - Add Ginkgo/Gomega tests for the new arguments and duration conversion. Closes #64 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent f277190 commit 7a517bc

2 files changed

Lines changed: 139 additions & 0 deletions

File tree

pkg/rabbitmqamqp/entities.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package rabbitmqamqp
22

3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
38
type iEntityIdentifier interface {
49
Id() string
510
}
@@ -400,6 +405,9 @@ type StreamQueueSpecification struct {
400405
Name string
401406
MaxLengthBytes int64
402407
InitialClusterSize int
408+
MaxAge time.Duration
409+
SegmentSize int64
410+
FilterSizeBytes int64
403411
Arguments map[string]any
404412
}
405413

@@ -433,11 +441,61 @@ func (s *StreamQueueSpecification) buildArguments() map[string]any {
433441
result["x-stream-initial-cluster-size"] = s.InitialClusterSize
434442
}
435443

444+
if s.MaxAge != 0 {
445+
result["x-max-age"] = durationToMaxAge(s.MaxAge)
446+
}
447+
448+
if s.SegmentSize != 0 {
449+
result["x-stream-max-segment-size-bytes"] = s.SegmentSize
450+
}
451+
452+
if s.FilterSizeBytes != 0 {
453+
result["x-stream-filter-size-bytes"] = s.FilterSizeBytes
454+
}
455+
436456
result["x-queue-type"] = string(s.queueType())
437457

438458
return result
439459
}
440460

461+
// durationToMaxAge converts a time.Duration to the RabbitMQ stream x-max-age string format.
462+
// RabbitMQ accepts durations expressed as a number followed by a unit: Y, M, D, h, m, s.
463+
// The largest fitting unit is used (e.g. 48h → "2D", 3600s → "1h").
464+
func durationToMaxAge(d time.Duration) string {
465+
if d == 0 {
466+
return "0s"
467+
}
468+
type unit struct {
469+
suffix string
470+
size time.Duration
471+
}
472+
units := []unit{
473+
{"Y", 365 * 24 * time.Hour},
474+
{"M", 30 * 24 * time.Hour},
475+
{"D", 24 * time.Hour},
476+
{"h", time.Hour},
477+
{"m", time.Minute},
478+
{"s", time.Second},
479+
}
480+
for _, u := range units {
481+
if d%u.size == 0 {
482+
return fmt.Sprintf("%d%s", d/u.size, u.suffix)
483+
}
484+
}
485+
// Fallback: round up to whole seconds for positive durations, enforcing a minimum of 1s.
486+
if d > 0 {
487+
secs := int64(d / time.Second)
488+
if d%time.Second != 0 {
489+
secs++
490+
}
491+
if secs < 1 {
492+
secs = 1
493+
}
494+
return fmt.Sprintf("%ds", secs)
495+
}
496+
return fmt.Sprintf("%ds", int64(d.Seconds()))
497+
}
498+
441499
// / **** Exchange ****
442500

443501
// TExchangeType represents the type of exchange

pkg/rabbitmqamqp/entities_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package rabbitmqamqp
22

33
import (
4+
"time"
5+
46
. "github.com/onsi/ginkgo/v2"
57
. "github.com/onsi/gomega"
68
)
@@ -19,4 +21,83 @@ var _ = Describe("Entities", func() {
1921
Expect(generatedName).ToNot(Equal(anotherGeneratedName), "different instances should generate different names")
2022
})
2123
})
24+
25+
Describe("StreamQueueSpecification", func() {
26+
It("should set x-stream-max-segment-size-bytes when SegmentSize is set", func() {
27+
spec := &StreamQueueSpecification{
28+
Name: "my-stream",
29+
SegmentSize: 500_000_000,
30+
}
31+
args := spec.buildArguments()
32+
Expect(args["x-stream-max-segment-size-bytes"]).To(Equal(int64(500_000_000)))
33+
})
34+
35+
It("should not set x-stream-max-segment-size-bytes when SegmentSize is zero", func() {
36+
spec := &StreamQueueSpecification{Name: "my-stream"}
37+
args := spec.buildArguments()
38+
Expect(args).ToNot(HaveKey("x-stream-max-segment-size-bytes"))
39+
})
40+
41+
It("should set x-max-age when MaxAge is set", func() {
42+
spec := &StreamQueueSpecification{
43+
Name: "my-stream",
44+
MaxAge: 7 * 24 * time.Hour,
45+
}
46+
args := spec.buildArguments()
47+
Expect(args["x-max-age"]).To(Equal("7D"))
48+
})
49+
50+
It("should not set x-max-age when MaxAge is zero", func() {
51+
spec := &StreamQueueSpecification{Name: "my-stream"}
52+
args := spec.buildArguments()
53+
Expect(args).ToNot(HaveKey("x-max-age"))
54+
})
55+
56+
It("should set both x-max-age and x-stream-max-segment-size-bytes together", func() {
57+
spec := &StreamQueueSpecification{
58+
Name: "my-stream",
59+
MaxAge: 30 * 24 * time.Hour,
60+
SegmentSize: 100_000_000,
61+
}
62+
args := spec.buildArguments()
63+
Expect(args["x-max-age"]).To(Equal("1M"))
64+
Expect(args["x-stream-max-segment-size-bytes"]).To(Equal(int64(100_000_000)))
65+
})
66+
67+
It("should set x-stream-filter-size-bytes when FilterSizeBytes is set", func() {
68+
spec := &StreamQueueSpecification{
69+
Name: "my-stream",
70+
FilterSizeBytes: 16,
71+
}
72+
args := spec.buildArguments()
73+
Expect(args["x-stream-filter-size-bytes"]).To(Equal(int64(16)))
74+
})
75+
76+
It("should not set x-stream-filter-size-bytes when FilterSizeBytes is zero", func() {
77+
spec := &StreamQueueSpecification{Name: "my-stream"}
78+
args := spec.buildArguments()
79+
Expect(args).ToNot(HaveKey("x-stream-filter-size-bytes"))
80+
})
81+
82+
It("should always set x-queue-type to stream", func() {
83+
spec := &StreamQueueSpecification{Name: "my-stream"}
84+
args := spec.buildArguments()
85+
Expect(args["x-queue-type"]).To(Equal("stream"))
86+
})
87+
})
88+
89+
Describe("durationToMaxAge", func() {
90+
DescribeTable("converts duration to RabbitMQ max-age string",
91+
func(d time.Duration, expected string) {
92+
Expect(durationToMaxAge(d)).To(Equal(expected))
93+
},
94+
Entry("seconds", 30*time.Second, "30s"),
95+
Entry("minutes", 5*time.Minute, "5m"),
96+
Entry("hours", 2*time.Hour, "2h"),
97+
Entry("days", 3*24*time.Hour, "3D"),
98+
Entry("months (30 days)", 30*24*time.Hour, "1M"),
99+
Entry("years (365 days)", 365*24*time.Hour, "1Y"),
100+
Entry("zero", time.Duration(0), "0s"),
101+
)
102+
})
22103
})

0 commit comments

Comments
 (0)