Skip to content

Commit 9aab047

Browse files
authored
Add example with reliable* (#379)
* Add example with reliable* --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 83553a8 commit 9aab047

File tree

5 files changed

+117
-4
lines changed

5 files changed

+117
-4
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ Stream uri: `rabbitmq-stream://guest:guest@localhost:5552`
8989

9090
### Getting started for impatient
9191

92-
See [getting started](./examples/getting_started.go) example.
92+
- [Getting started with reliable producer/consumer](./examples/reliable_getting_started/reliable_getting_started.go) example.
93+
- [Getting started with standard producer/consumer](./examples/getting_started/getting_started.go) example.
94+
- Getting started Video tutorial:
95+
96+
[![Getting Started](https://img.youtube.com/vi/8qfvl6FgC50/0.jpg)](https://www.youtube.com/watch?v=8qfvl6FgC50)
9397

9498
### Examples
9599

examples/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Stream examples
22
===
3-
4-
- [Getting started](./getting_started.go) - A good point to start.
3+
- [Reliable getting started](./getting_started/getting_started.go) - The structures you need to start.
4+
- [Getting started](./getting_started/getting_started.go) - Producer and Consumer example without reconnection
55
- [Offset Start](./offsetStart/offset.go) - How to set different points to start consuming
66
- [Offset Tracking](./offsetTracking/offsetTracking.go) - Manually store the consumer offset
77
- [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go) - Automatic store the consumer offset
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
9+
)
10+
11+
func main() {
12+
fmt.Printf("Getting started with Streaming client for RabbitMQ\n")
13+
14+
// Create the environment. You can set the log level to DEBUG for more information
15+
// stream.SetLevelInfo(logs.DEBUG)
16+
// the environment is the connection to the broker(s)
17+
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
18+
SetHost("localhost").
19+
SetPort(5552))
20+
if err != nil {
21+
fmt.Printf("Error creating environment: %v\n", err)
22+
return
23+
}
24+
25+
// Create a stream
26+
streamName := "my-stream"
27+
// It is highly recommended to define the stream retention policy
28+
err = env.DeclareStream(streamName, stream.NewStreamOptions().
29+
SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
30+
31+
// ignore the error if the stream already exists
32+
if err != nil && !errors.Is(err, stream.StreamAlreadyExists) {
33+
fmt.Printf("Error declaring stream: %v\n", err)
34+
return
35+
}
36+
37+
// declare the reliable consumer using the package ha
38+
consumer, err := ha.NewReliableConsumer(env, streamName,
39+
// start from the beginning of the stream
40+
stream.NewConsumerOptions().
41+
SetOffset(stream.OffsetSpecification{}.First()),
42+
// handler where the messages will be processed
43+
func(consumerContext stream.ConsumerContext, message *amqp.Message) {
44+
fmt.Printf("Message received: %s\n", message.GetData())
45+
})
46+
47+
if err != nil {
48+
fmt.Printf("Error creating consumer: %v\n", err)
49+
return
50+
}
51+
52+
// Create the reliable producer using the package ha
53+
producer, err := ha.NewReliableProducer(env, streamName,
54+
// we leave the default options
55+
stream.NewProducerOptions(),
56+
// handler for the confirmation of the messages
57+
func(messageConfirm []*stream.ConfirmationStatus) {
58+
for _, msg := range messageConfirm {
59+
if msg.IsConfirmed() {
60+
fmt.Printf("message %s confirmed \n", msg.GetMessage().GetData())
61+
} else {
62+
fmt.Printf("message %s failed \n", msg.GetMessage().GetData())
63+
}
64+
}
65+
})
66+
67+
if err != nil {
68+
fmt.Printf("Error creating producer: %v\n", err)
69+
return
70+
}
71+
72+
// Send a message
73+
for i := 0; i < 10; i++ {
74+
err = producer.Send(amqp.NewMessage([]byte(fmt.Sprintf("Hello stream:%d", i))))
75+
if err != nil {
76+
fmt.Printf("Error sending message: %v\n", err)
77+
return
78+
}
79+
}
80+
81+
// press any key to exit
82+
fmt.Printf("Press any close the producer, consumer and environment\n")
83+
_, _ = fmt.Scanln()
84+
85+
//// Close the producer
86+
err = producer.Close()
87+
if err != nil {
88+
fmt.Printf("Error closing producer: %v\n", err)
89+
}
90+
91+
// Close the consumer
92+
err = consumer.Close()
93+
if err != nil {
94+
fmt.Printf("Error closing consumer: %v\n", err)
95+
}
96+
97+
err = env.DeleteStream(streamName)
98+
if err != nil {
99+
fmt.Printf("Error deleting stream: %v\n", err)
100+
}
101+
102+
// Close the environment
103+
err = env.Close()
104+
if err != nil {
105+
fmt.Printf("Error closing environment: %s\n", err)
106+
}
107+
108+
}

pkg/ha/ha_publisher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
4848
p.reconnectionSignal.L.Lock()
4949
p.reconnectionSignal.Broadcast()
5050
p.reconnectionSignal.L.Unlock()
51-
logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo())
51+
logs.LogDebug("[Reliable] - %s reconnection signal sent", p.getInfo())
52+
5253
}()
5354
}
5455

0 commit comments

Comments
 (0)