Skip to content

Commit 37dec52

Browse files
committed
Add the example
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 5b9268d commit 37dec52

3 files changed

Lines changed: 106 additions & 4 deletions

File tree

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
1313
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
1414
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
15-
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
15+
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
16+
- [Pre-settled messages](pre_settled) - An example of how to receive pre-settled messages with the AMQP 1.0 client.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// This example is demonstrating how to consume messages with pre-settled mode enabled.
4+
// pre-settled is valid only for classic and quorum queues.
5+
// Pre-settled mode means that messages are considered accepted by the broker as soon as they are delivered to the consumer,
6+
// without requiring explicit acceptance from the consumer side.
7+
// This is useful for scenarios where at-most-once delivery semantics are acceptable, and it can improve performance by reducing
8+
// the overhead of message acknowledgments.
9+
// Example path:https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/pre_settled/pre_settled.go
10+
package main
11+
12+
import (
13+
"context"
14+
"fmt"
15+
16+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
17+
)
18+
19+
func main() {
20+
rmq.Info("Pre-Settled example with AMQP Go AMQP 1.0 Client")
21+
22+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
23+
amqpConnection, err := env.NewConnection(context.Background())
24+
if err != nil {
25+
rmq.Error("Error opening connection", err)
26+
return
27+
}
28+
// create queue for the example
29+
_, err = amqpConnection.Management().DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
30+
Name: "pre-settled-queue",
31+
})
32+
if err != nil {
33+
rmq.Error("Error declaring queue", err)
34+
return
35+
}
36+
rmq.Info("Queue 'pre-settled-queue' declared")
37+
38+
// publish some messages to the queue
39+
producer, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{
40+
Queue: "pre-settled-queue",
41+
}, nil)
42+
if err != nil {
43+
rmq.Error("Error creating producer", err)
44+
return
45+
}
46+
for i := 0; i < 100; i++ {
47+
msg := rmq.NewMessage([]byte(fmt.Sprintf("Pre-settled message number %d ", i+1)))
48+
_, err := producer.Publish(context.TODO(), msg)
49+
if err != nil {
50+
rmq.Error("Error publishing message", err)
51+
return
52+
}
53+
}
54+
55+
// create consumer with pre-settled mode enabled
56+
57+
consumer, err := amqpConnection.NewConsumer(context.TODO(), "pre-settled-queue", &rmq.ConsumerOptions{
58+
PreSettled: true,
59+
})
60+
if err != nil {
61+
rmq.Error("Error creating consumer", err)
62+
return
63+
}
64+
rmq.Info("Consumer created with pre-settled mode enabled")
65+
66+
for i := 0; i < 100; i++ {
67+
dc, err := consumer.Receive(context.TODO())
68+
if err != nil {
69+
rmq.Error("Error consuming message", err)
70+
return
71+
}
72+
// here we don't need to accept the message
73+
// because pre-settled mode is enabled
74+
rmq.Info("[Consumer]", "Message received", string(dc.Message().GetData()))
75+
}
76+
77+
// clean up
78+
err = consumer.Close(context.TODO())
79+
if err != nil {
80+
rmq.Error("Error closing consumer", err)
81+
return
82+
}
83+
err = producer.Close(context.TODO())
84+
if err != nil {
85+
rmq.Error("Error closing producer", err)
86+
return
87+
}
88+
// delete the queue
89+
err = amqpConnection.Management().DeleteQueue(context.TODO(), "pre-settled-queue")
90+
if err != nil {
91+
rmq.Error("Error deleting queue", err)
92+
return
93+
}
94+
rmq.Info("Queue 'pre-settled-queue' deleted")
95+
96+
err = amqpConnection.Close(context.TODO())
97+
if err != nil {
98+
rmq.Error("Error closing connection", err)
99+
return
100+
}
101+
rmq.Info("Example finished successfully")
102+
103+
}

pkg/rabbitmqamqp/amqp_types.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ type IConsumerOptions interface {
5454
// preSettled indicates if the consumer should use pre-settled delivery mode.
5555
// When enabled, messages arrive already settled from the broker, which makes
5656
// settlement from the client with a disposition frame not necessary.
57-
// This is the "fire-and-forget" or "at-most-once" mode. It should be faster
58-
// but may result in message loss. It is fine for use cases like log streaming
59-
// or sensor telemetry.
57+
// This is the "fire-and-forget" or "at-most-once" mode.
6058
preSettled() bool
6159
}
6260

0 commit comments

Comments
 (0)