Skip to content

Commit 1886a11

Browse files
committed
JetStream instead NATS stan
1 parent 4b47369 commit 1886a11

12 files changed

Lines changed: 88 additions & 121 deletions

File tree

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Then we filter out only the events we need and publish them in the queue
2323

2424
### Event publishing
2525

26-
NATS Streaming is used as a message broker.
26+
NATS JetStream is used as a message broker.
2727
Service publishes the following structure.
2828
The name of the topic for subscription to receive messages is formed from the prefix of the topic,
2929
the name of the database and the name of the table `prefix + schema_table`.
@@ -39,7 +39,7 @@ the name of the database and the name of the table `prefix + schema_table`.
3939
}
4040
```
4141

42-
Messages are published to Nats-Streaming at least once!
42+
Messages are published to NATS (JetStream) at least once!
4343

4444
### Filter configuration example
4545

@@ -56,7 +56,7 @@ This filter means that we only process events occurring with the `users` table,
5656
and in particular `insert` and `update` data.
5757

5858
### Topic mapping
59-
By default, output NATS topic name consist of prefix, DB schema, and DB table name,
59+
By default, output NATS topic name consist of prefix, DB schema, and DB table name,
6060
but if you want to send all update in one topic you should be configured the topic map:
6161
```yaml
6262
topicsMap:
@@ -114,5 +114,5 @@ monitoring:
114114
You can start the container from the project folder (configuration file is required)
115115
116116
```
117-
docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:v2.0.0
117+
docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:v2.1.0
118118
```

cmd/wal-listener/init.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/evalphobia/logrus_sentry"
88
"github.com/jackc/pgx"
9+
"github.com/nats-io/nats.go"
910
"github.com/sirupsen/logrus"
1011
"github.com/spf13/viper"
1112

@@ -82,6 +83,29 @@ func initLogger(cfg config.LoggerCfg, version string) *logrus.Entry {
8283
return logger.WithField("version", version)
8384
}
8485

86+
// createStream creates a stream by using JetStreamContext. We can do it manually.
87+
func createStream(logger *logrus.Entry, js nats.JetStreamContext, streamName string) error {
88+
stream, err := js.StreamInfo(streamName)
89+
if err != nil {
90+
logger.WithError(err).Warnln("stream info")
91+
}
92+
93+
if stream == nil {
94+
var streamSubjects = streamName + ".*"
95+
96+
if _, err = js.AddStream(&nats.StreamConfig{
97+
Name: streamName,
98+
Subjects: []string{streamSubjects},
99+
}); err != nil {
100+
return err
101+
}
102+
103+
logger.WithField("subjects", streamSubjects).Infoln("stream not exists, created..")
104+
}
105+
106+
return nil
107+
}
108+
85109
func initSentry(dsn string, logger *logrus.Entry) {
86110
if len(dsn) == 0 {
87111
logger.Warnln("empty Sentry DSN")

cmd/wal-listener/main.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66
"os"
77

8-
"github.com/nats-io/stan.go"
8+
"github.com/nats-io/nats.go"
99
"github.com/sirupsen/logrus"
1010
"github.com/urfave/cli/v2"
1111

@@ -47,10 +47,20 @@ func main() {
4747

4848
initSentry(cfg.Monitoring.SentryDSN, logger)
4949

50-
natsConn, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address))
50+
natsConn, err := nats.Connect(cfg.Nats.Address)
5151
if err != nil {
5252
return fmt.Errorf("nats connection: %w", err)
5353
}
54+
defer natsConn.Close()
55+
56+
js, err := natsConn.JetStream()
57+
if err != nil {
58+
return fmt.Errorf("jet stream: %w", err)
59+
}
60+
61+
if err := createStream(logger, js, cfg.Nats.StreamName); err != nil {
62+
return fmt.Errorf("create Nats stream: %w", err)
63+
}
5464

5565
conn, rConn, err := initPgxConnections(cfg.Database)
5666
if err != nil {
@@ -62,7 +72,7 @@ func main() {
6272
logger,
6373
listener.NewRepository(conn),
6474
rConn,
65-
listener.NewNatsPublisher(natsConn),
75+
listener.NewNatsPublisher(js),
6676
listener.NewBinaryParser(binary.BigEndian),
6777
)
6878

config/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ type ListenerCfg struct {
2828
// NatsCfg path of the NATS config.
2929
type NatsCfg struct {
3030
Address string `valid:"required"`
31-
ClusterID string `valid:"required"`
32-
ClientID string `valid:"required"`
31+
StreamName string `valid:"required"`
3332
TopicPrefix string
3433
}
3534

config/config_test.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ func TestConfig_Validate(t *testing.T) {
3636
},
3737
Nats: NatsCfg{
3838
Address: "addr",
39-
ClusterID: "cluster",
40-
ClientID: "client",
39+
StreamName: "stream",
4140
TopicPrefix: "prefix",
4241
},
4342
},
@@ -59,8 +58,7 @@ func TestConfig_Validate(t *testing.T) {
5958
},
6059
Nats: NatsCfg{
6160
Address: "addr",
62-
ClusterID: "cluster",
63-
ClientID: "client",
61+
StreamName: "stream",
6462
TopicPrefix: "prefix",
6563
},
6664
},
@@ -82,8 +80,7 @@ func TestConfig_Validate(t *testing.T) {
8280
},
8381
Nats: NatsCfg{
8482
Address: "addr",
85-
ClusterID: "cluster",
86-
ClientID: "client",
83+
StreamName: "stream",
8784
TopicPrefix: "prefix",
8885
},
8986
},
@@ -106,13 +103,35 @@ func TestConfig_Validate(t *testing.T) {
106103
Password: "pass",
107104
},
108105
Nats: NatsCfg{
109-
ClusterID: "cluster",
110-
ClientID: "client",
106+
StreamName: "stream",
111107
TopicPrefix: "prefix",
112108
},
113109
},
114110
wantErr: errors.New("Nats.Address: non zero value required"),
115111
},
112+
{
113+
name: "empty nats stream cfg",
114+
fields: fields{
115+
Listener: ListenerCfg{
116+
SlotName: "slot",
117+
AckTimeout: 10,
118+
RefreshConnection: 10,
119+
HeartbeatInterval: 10,
120+
},
121+
Database: DatabaseCfg{
122+
Host: "host",
123+
Port: 10,
124+
Name: "db",
125+
User: "usr",
126+
Password: "pass",
127+
},
128+
Nats: NatsCfg{
129+
Address: "addr",
130+
TopicPrefix: "prefix",
131+
},
132+
},
133+
wantErr: errors.New("Nats.StreamName: non zero value required"),
134+
},
116135
}
117136
for _, tt := range tests {
118137
t.Run(tt.name, func(t *testing.T) {

go.mod

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/google/uuid v1.3.0
1111
github.com/jackc/pgx v3.6.2+incompatible
1212
github.com/magiconair/properties v1.8.6
13-
github.com/nats-io/stan.go v0.10.3
13+
github.com/nats-io/nats.go v1.16.0
1414
github.com/sirupsen/logrus v1.9.0
1515
github.com/spf13/viper v1.12.0
1616
github.com/stretchr/testify v1.8.0
@@ -25,15 +25,12 @@ require (
2525
github.com/fsnotify/fsnotify v1.5.4 // indirect
2626
github.com/getsentry/raven-go v0.2.0 // indirect
2727
github.com/gofrs/uuid v3.2.0+incompatible // indirect
28-
github.com/gogo/protobuf v1.3.2 // indirect
2928
github.com/hashicorp/hcl v1.0.0 // indirect
3029
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
3130
github.com/lib/pq v1.2.0 // indirect
3231
github.com/mitchellh/mapstructure v1.5.0 // indirect
3332
github.com/nats-io/jwt v1.2.2 // indirect
3433
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
35-
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
36-
github.com/nats-io/nats.go v1.16.0 // indirect
3734
github.com/nats-io/nkeys v0.3.0 // indirect
3835
github.com/nats-io/nuid v1.0.1 // indirect
3936
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

0 commit comments

Comments
 (0)