Skip to content

Commit b03b11e

Browse files
authored
Merge pull request #95 from nats-io/support-bind-stream
Support bind stream JS option
2 parents 91ae479 + d873ee1 commit b03b11e

7 files changed

Lines changed: 113 additions & 3 deletions

File tree

docker-bake.hcl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ variable CI {
1616
}
1717

1818
variable image_base {
19-
default = "docker-image://alpine:3.17.3"
19+
default = "docker-image://alpine:3.18.4"
2020
}
2121

2222
variable image_goreleaser {

server/conf/conf.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ type ConnectorConfig struct {
209209

210210
Subject string // Used for nats and jetstream connections
211211
QueueName string // Optional, used for nats connections
212+
Stream string // Uses BindStream option for JetStream to consume from sourced streams
212213

213214
Brokers []string // list of brokers to use for creating a reader/writer
214215
Topic string // kafka topic

server/core/connector.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ func (conn *BridgeConnector) subscribeToJetStream(subject string) (*nats.Subscri
382382
if d := conn.bridge.config.JetStream.HeartbeatInterval; d > 0 {
383383
options = append(options, nats.IdleHeartbeat(time.Duration(d)*time.Millisecond))
384384
}
385+
if len(conn.config.Stream) > 0 {
386+
options = append(options, nats.BindStream(conn.config.Stream))
387+
}
385388

386389
traceEnabled := conn.bridge.Logger().TraceEnabled()
387390
ackSyncEnabled := conn.bridge.config.JetStream.EnableAckSync

server/core/jetstream2kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (conn *JetStream2KafkaConnector) Shutdown() error {
8181
// CheckConnections ensures the nats/stan connection and report an error if it is down
8282
func (conn *JetStream2KafkaConnector) CheckConnections() error {
8383
if !conn.bridge.CheckJetStream() {
84-
return fmt.Errorf("%s connector requires nats streaming to be available", conn.String())
84+
return fmt.Errorf("%s connector requires nats jetstream to be available", conn.String())
8585
}
8686
return nil
8787
}

server/core/jetstream2kafka_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,3 +906,55 @@ func TestJetStreamAlreadyConnected(t *testing.T) {
906906

907907
require.NoError(t, tbs.Bridge.connectToJetStream())
908908
}
909+
910+
func TestJetStreamSourcesConsumedByKafka(t *testing.T) {
911+
topic := nuid.Next()
912+
913+
connect := []conf.ConnectorConfig{
914+
{
915+
Type: "JetStreamToKafka",
916+
Subject: "foo.*",
917+
Topic: topic,
918+
DurableName: "KafkaBridgeConsumer",
919+
Stream: "FOO_GLOBAL",
920+
},
921+
}
922+
923+
tbs, err := StartTestEnvironmentWithSources(connect)
924+
require.NoError(t, err)
925+
defer tbs.Close()
926+
927+
tbs.Bridge.checkConnections()
928+
929+
_, err = tbs.JS.Publish("foo.one", []byte("one"))
930+
require.NoError(t, err)
931+
_, err = tbs.JS.Publish("foo.two", []byte("two"))
932+
require.NoError(t, err)
933+
_, err = tbs.JS.Publish("foo.one.1", []byte("another one"))
934+
require.NoError(t, err)
935+
_, err = tbs.JS.Publish("foo.three", []byte("three"))
936+
require.NoError(t, err)
937+
938+
reader := tbs.CreateReader(topic, 5000)
939+
defer reader.Close()
940+
941+
_, data, _, err := tbs.GetMessageFromKafka(reader, 5000)
942+
require.NoError(t, err)
943+
require.Equal(t, "one", string(data))
944+
945+
_, data, _, err = tbs.GetMessageFromKafka(reader, 5000)
946+
require.NoError(t, err)
947+
require.Equal(t, "two", string(data))
948+
949+
_, data, _, err = tbs.GetMessageFromKafka(reader, 5000)
950+
require.NoError(t, err)
951+
require.Equal(t, "three", string(data))
952+
953+
stats := tbs.Bridge.SafeStats()
954+
connStats := stats.Connections[0]
955+
require.Equal(t, int64(3), connStats.MessagesIn)
956+
require.Equal(t, int64(3), connStats.MessagesOut)
957+
require.Equal(t, int64(1), connStats.Connects)
958+
require.Equal(t, int64(0), connStats.Disconnects)
959+
require.True(t, connStats.Connected)
960+
}

server/core/kafka2jetstream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (conn *Kafka2JetStreamConnector) Shutdown() error {
103103
// CheckConnections ensures the nats/stan connection and report an error if it is down
104104
func (conn *Kafka2JetStreamConnector) CheckConnections() error {
105105
if !conn.bridge.CheckJetStream() {
106-
return fmt.Errorf("%s connector requires nats streaming to be available", conn.String())
106+
return fmt.Errorf("%s connector requires nats jetstream to be available", conn.String())
107107
}
108108
return nil
109109
}

server/core/server_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,60 @@ func StartSASLTestEnvironment(connections []conf.ConnectorConfig) (*TestEnv, err
172172
return tbs, err
173173
}
174174

175+
func StartTestEnvironmentWithSources(connections []conf.ConnectorConfig) (*TestEnv, error) {
176+
tbs, err := StartTestEnvironmentInfrastructure(false, false, collectTopics(connections))
177+
if err != nil {
178+
return nil, err
179+
}
180+
181+
_, err = tbs.JS.AddStream(&nats.StreamConfig{
182+
Name: "FOO_1",
183+
Subjects: []string{"foo.one"},
184+
})
185+
if err != nil {
186+
return nil, err
187+
}
188+
_, err = tbs.JS.AddStream(&nats.StreamConfig{
189+
Name: "FOO_2",
190+
Subjects: []string{"foo.two"},
191+
})
192+
if err != nil {
193+
return nil, err
194+
}
195+
_, err = tbs.JS.AddStream(&nats.StreamConfig{
196+
Name: "FOO_3",
197+
Subjects: []string{"foo.three"},
198+
})
199+
if err != nil {
200+
return nil, err
201+
}
202+
_, err = tbs.JS.AddStream(&nats.StreamConfig{
203+
Name: "SUB_FOO_1",
204+
Subjects: []string{"foo.one.1"},
205+
})
206+
if err != nil {
207+
return nil, err
208+
}
209+
_, err = tbs.JS.AddStream(&nats.StreamConfig{
210+
Name: "FOO_GLOBAL",
211+
Sources: []*nats.StreamSource{
212+
{Name: "FOO_1"},
213+
{Name: "FOO_2"},
214+
{Name: "FOO_3"},
215+
},
216+
})
217+
if err != nil {
218+
return nil, err
219+
}
220+
221+
err = tbs.StartBridge(connections)
222+
if err != nil {
223+
tbs.Close()
224+
return nil, err
225+
}
226+
return tbs, err
227+
}
228+
175229
// StartTestEnvironmentInfrastructure creates the kafka server, Nats and streaming
176230
// but does not start a bridge, you can use StartBridge to start a bridge afterward
177231
func StartTestEnvironmentInfrastructure(useSASL, useTLS bool, topics []string) (*TestEnv, error) {

0 commit comments

Comments
 (0)