Skip to content

Commit 8bb49eb

Browse files
implement nats-jetstreaming subscription with multiple consumer modes
This commit implements subscription based on the nats-jetstreaming queue, leveraging JetStream to offer multiple consumer modes and supporting various consumer configurations and consumption methods to meet the demands of different business scenarios.
1 parent 7e39254 commit 8bb49eb

File tree

10 files changed

+834
-0
lines changed

10 files changed

+834
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# config.yaml
2+
3+
nats:
4+
url: "nats://127.0.0.1:4222"
5+
6+
streams:
7+
- name: "user"
8+
description: "user stream"
9+
subjects: ["user.>"]
10+
11+
consumerQueues:
12+
- consumerConfig:
13+
name: "taskRegister"
14+
durable: "taskRegister"
15+
filterSubjects: ["user.register"]
16+
delivery:
17+
consumptionMethod: "consumer"
18+
queueConsumerCount: 3
19+
streamName: "user"
20+
21+
- consumerConfig:
22+
name: "activityRegister"
23+
durable: "activityRegister"
24+
filterSubjects: ["user.register"]
25+
delivery:
26+
consumptionMethod: "consumer"
27+
streamName: "user"
28+
29+
- consumerConfig:
30+
name: "taskRecharger"
31+
durable: "taskRecharger"
32+
filterSubjects: ["user.recharge"]
33+
delivery:
34+
consumptionMethod: "fetch"
35+
queueConsumerCount: 2
36+
streamName: "user"
37+
38+
- consumerConfig:
39+
filterSubjects: ["user.recharge"]
40+
delivery:
41+
consumptionMethod: "fetchNoWait"
42+
streamName: "user"
43+
44+
- consumerConfig:
45+
filterSubjects: ["subject.activity.*"]
46+
ordered: true
47+
delivery:
48+
consumptionMethod: "consumer"
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"github.com/nats-io/nats.go"
6+
"github.com/nats-io/nats.go/jetstream"
7+
"github.com/zeromicro/go-queue/natsmq/common"
8+
"github.com/zeromicro/go-queue/natsmq/consumer"
9+
"github.com/zeromicro/go-zero/core/conf"
10+
"log"
11+
"os"
12+
"os/signal"
13+
"syscall"
14+
"time"
15+
)
16+
17+
var configFile = flag.String("f", "config.yaml", "Specify the config file")
18+
19+
type ConsumerExampleConfig struct {
20+
Streams []*common.JetStreamConfig `json:"streams"`
21+
Nats NatsConf `json:"nats"`
22+
ConsumerQueues []consumer.ConsumerQueueConfig `json:"consumerQueues"`
23+
}
24+
25+
type NatsConf struct {
26+
URL string `json:"url"`
27+
}
28+
29+
type MyConsumeHandler struct{}
30+
31+
func (h *MyConsumeHandler) Consume(msg jetstream.Msg) error {
32+
log.Printf("subject [%s] Received message: %s", msg.Subject(), string(msg.Data()))
33+
return nil
34+
}
35+
36+
func main() {
37+
flag.Parse()
38+
var c ConsumerExampleConfig
39+
conf.MustLoad(*configFile, &c)
40+
41+
var queueConfigs []*consumer.ConsumerQueueConfig
42+
for i := range c.ConsumerQueues {
43+
c.ConsumerQueues[i].Handler = &MyConsumeHandler{}
44+
queueConfigs = append(queueConfigs, &c.ConsumerQueues[i])
45+
}
46+
47+
natsConf := &common.NatsConfig{
48+
URL: c.Nats.URL,
49+
Options: []nats.Option{},
50+
}
51+
52+
cm, err := consumer.NewConsumerManager(natsConf, c.Streams, queueConfigs)
53+
if err != nil {
54+
log.Fatalf("failed to create consumer manager: %v", err)
55+
}
56+
57+
go cm.Start()
58+
59+
sigChan := make(chan os.Signal, 1)
60+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
61+
sig := <-sigChan
62+
log.Printf("Received signal %s, shutting down...", sig)
63+
cm.Stop()
64+
time.Sleep(time.Second)
65+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
version: "3.6"
2+
3+
services:
4+
nats:
5+
image: nats:2.1.8-alpine3.11
6+
command:
7+
- "--jetstream"
8+
- "--debug"
9+
- "--port"
10+
- "4222"
11+
- "--http_port"
12+
- "8222"
13+
ports:
14+
- "4222:4222"
15+
- "8222:8222"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# config.yaml
2+
3+
nats:
4+
url: "nats://127.0.0.1:4222"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"github.com/nats-io/nats.go"
7+
"github.com/zeromicro/go-queue/natsmq/common"
8+
"github.com/zeromicro/go-queue/natsmq/publisher"
9+
"github.com/zeromicro/go-zero/core/conf"
10+
11+
"log"
12+
"time"
13+
)
14+
15+
var configFile = flag.String("f", "config.yaml", "Specify the config file")
16+
17+
type PublisherExampleConfig struct {
18+
Nats NatsConf `json:"nats"`
19+
}
20+
21+
type NatsConf struct {
22+
URL string `json:"url"`
23+
}
24+
25+
func main() {
26+
flag.Parse()
27+
var c PublisherExampleConfig
28+
conf.MustLoad(*configFile, &c)
29+
30+
natsConf := &common.NatsConfig{
31+
URL: c.Nats.URL,
32+
Options: []nats.Option{},
33+
}
34+
35+
jSPublisher, err := publisher.NewJetStreamPublisher(natsConf)
36+
37+
if err != nil {
38+
log.Fatalf("failed to NewJetStreamPublisher message: %v", err)
39+
}
40+
41+
subjects := []string{
42+
"user.register",
43+
"user.recharge",
44+
"subject.activity.example",
45+
}
46+
messages := []string{
47+
"Test message: user.register message",
48+
"Test message: user.recharge message",
49+
"Test message: subject.activity message",
50+
}
51+
ctx := context.Background()
52+
for i, subj := range subjects {
53+
msg := []byte(messages[i])
54+
for j := 0; j < 3; j++ {
55+
go func(s string, m []byte) {
56+
ack, err := jSPublisher.Publish(ctx, s, m)
57+
if err != nil {
58+
log.Fatalf("failed to publish message: %v", err)
59+
}
60+
log.Printf("published message to %s, ack: %+v", s, ack.Stream)
61+
}(subj, msg)
62+
}
63+
}
64+
65+
time.Sleep(2 * time.Second)
66+
}

natsmq/common/common.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package common
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/nats-io/nats.go"
7+
"github.com/nats-io/nats.go/jetstream"
8+
"log"
9+
"sync"
10+
"time"
11+
)
12+
13+
const (
14+
DefaultStream = "defaultStream"
15+
)
16+
17+
var (
18+
streamRegistry = make(map[string]*JetStreamManager)
19+
registryLock sync.RWMutex
20+
streamInstances = make(map[string]jetstream.Stream)
21+
streamInstLock sync.RWMutex
22+
)
23+
24+
// RegisterManager registers a JetStreamManager with the specified streamID.
25+
func RegisterManager(streamID string, mgr *JetStreamManager) {
26+
registryLock.Lock()
27+
defer registryLock.Unlock()
28+
streamRegistry[streamID] = mgr
29+
}
30+
31+
// GetManager retrieves the JetStreamManager for the given streamID.
32+
// Returns the manager and true if found; otherwise returns nil and false.
33+
func GetManager(streamID string) (*JetStreamManager, bool) {
34+
registryLock.RLock()
35+
defer registryLock.RUnlock()
36+
mgr, ok := streamRegistry[streamID]
37+
return mgr, ok
38+
}
39+
40+
// RegisterStreamInstances initializes the JetStream contexts (if needed),
41+
// creates or updates streams based on the provided JetStream configurations,
42+
// and stores the stream instances in a global map for later usage.
43+
// Parameters:
44+
//
45+
// nc - pointer to the NATS connection
46+
// cfgs - list of JetStreamConfig configurations to register
47+
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
48+
// Register managers for each provided configuration if not already registered.
49+
if len(cfgs) > 0 {
50+
for _, cfg := range cfgs {
51+
if _, ok := GetManager(cfg.Name); !ok {
52+
mgr := NewJetStream(cfg)
53+
RegisterManager(cfg.Name, mgr)
54+
} else {
55+
log.Printf("manager for stream %q already registered", cfg.Name)
56+
}
57+
}
58+
}
59+
60+
// Iterate through all registered stream managers to initialize JetStream and create stream instances.
61+
for streamName, streamMgr := range streamRegistry {
62+
streamInstLock.RLock()
63+
_, exists := streamInstances[streamName]
64+
streamInstLock.RUnlock()
65+
if exists {
66+
log.Printf("streamInstance %q already created", streamName)
67+
continue
68+
}
69+
// Initialize JetStream context
70+
if err := streamMgr.InitJetStream(nc); err != nil {
71+
log.Printf("failed to initialize jetstream for stream %q: %v", streamName, err)
72+
continue
73+
}
74+
ctx := context.Background()
75+
stream, err := streamMgr.CreateStream(ctx)
76+
if err != nil {
77+
log.Printf("failed to create stream %q: %v", streamName, err)
78+
continue
79+
}
80+
streamInstLock.Lock()
81+
streamInstances[streamName] = stream
82+
streamInstLock.Unlock()
83+
log.Printf("streamInstance %q created", streamName)
84+
}
85+
}
86+
87+
// GetStream retrieves a JetStream stream instance by streamID.
88+
// Returns the stream instance and true if found.
89+
func GetStream(streamID string) (jetstream.Stream, bool) {
90+
streamInstLock.RLock()
91+
defer streamInstLock.RUnlock()
92+
stream, ok := streamInstances[streamID]
93+
return stream, ok
94+
}
95+
96+
func init() {
97+
// Registers a default stream manager if one hasn't been registered.
98+
if _, ok := GetManager(DefaultStream); !ok {
99+
defaultCfg := &JetStreamConfig{
100+
Name: DefaultStream,
101+
Subjects: []string{"subject.*.*"},
102+
Description: DefaultStream,
103+
Retention: 0,
104+
MaxConsumers: 30,
105+
MaxMsgs: -1,
106+
MaxBytes: -1,
107+
Discard: 0,
108+
DiscardNewPerSubject: false,
109+
MaxAge: 0,
110+
MaxMsgsPerSubject: 10000,
111+
MaxMsgSize: 10000,
112+
NoAck: false,
113+
}
114+
defaultManager := NewJetStream(defaultCfg)
115+
RegisterManager(DefaultStream, defaultManager)
116+
log.Printf("default stream %q registered", DefaultStream)
117+
}
118+
}
119+
120+
// InitJetStream initializes the JetStream context for the manager using the given NATS connection.
121+
// Parameters:
122+
//
123+
// nc - pointer to the NATS connection
124+
//
125+
// Returns:
126+
//
127+
// error - non-nil error if JetStream context creation fails
128+
func (jsm *JetStreamManager) InitJetStream(nc *nats.Conn) error {
129+
js, err := jetstream.New(nc)
130+
if err != nil {
131+
nc.Close()
132+
return fmt.Errorf("failed to create JetStream context: %w", err)
133+
}
134+
jsm.JS = js
135+
return nil
136+
}
137+
138+
// CreateStream creates or updates a JetStream stream using the manager's configuration.
139+
// Parameters:
140+
//
141+
// ctx - context to control request timeout
142+
//
143+
// Returns:
144+
//
145+
// jetstream.Stream - the created/updated stream instance
146+
// error - non-nil error if stream creation fails
147+
func (jsm *JetStreamManager) CreateStream(ctx context.Context) (jetstream.Stream, error) {
148+
streamCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
149+
defer cancel()
150+
stream, err := jsm.JS.CreateOrUpdateStream(streamCtx, jsm.streamConf)
151+
if err != nil {
152+
return nil, err
153+
}
154+
return stream, nil
155+
}

0 commit comments

Comments
 (0)