Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 7 additions & 29 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,26 @@ up:
docker-compose up -d

test:
go test -parallel 20 ./pkg/nats...
go test -parallel 20 ./pkg...

test_v:
go test -parallel 20 -v ./pkg/nats...
go test -parallel 20 -v ./pkg...

test_short:
go test -parallel 20 ./pkg/nats... -short
go test -parallel 20 ./pkg... -short

test_race:
go test ./pkg/nats... -short -race
go test ./pkg... -short -race

test_stress:
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./pkg/nats...
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./pkg...

test_codecov:
echo "this is a no-op because it times out on github runners but you could try"
echo "go test -coverprofile=coverage.out -covermode=atomic ./pkg/nats... -short"
echo "go test -coverprofile=coverage.out -covermode=atomic ./pkg... -short"

test_reconnect:
go test -tags=reconnect ./pkg/nats...

jetstream_test:
go test -parallel 20 ./pkg/jetstream...

jetstream_test_v:
go test -parallel 20 -v ./pkg/jetstream...

jetstream_test_short:
go test -parallel 20 ./pkg/jetstream... -short

jetstream_test_race:
go test ./pkg/jetstream... -short -race

jetstream_test_stress:
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./pkg/jetstream...

jetstream_test_reconnect:
go test -tags=reconnect ./pkg/jetstream...

jetstream_test_codecov:
echo "this is a no-op because it times out on github runners but you could try"
echo "go test -coverprofile=coverage.out -covermode=atomic ./pkg/jetstream... -short"
go test -tags=reconnect ./pkg...

BENCHCNT := 1

Expand Down
12 changes: 6 additions & 6 deletions _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ require (
github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0
github.com/google/uuid v1.3.0
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.32.0
github.com/stretchr/testify v1.8.1
google.golang.org/protobuf v1.28.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
9 changes: 9 additions & 0 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2g
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -35,11 +36,14 @@ github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjj
github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nats.go v1.31.1-0.20231201130123-4af26aae2522/go.mod h1:uCwt8khnwboRrH1RbNzJh9C/GEnXnnwkcB/bUoz8eJs=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand All @@ -62,15 +66,20 @@ go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnw
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.32.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
)
Expand All @@ -14,17 +14,17 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -24,8 +26,14 @@ github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJV
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nats.go v1.31.1-0.20231201130123-4af26aae2522 h1:h58TgHy2+kE410WY11okAQ6nlzG+bTaex3A0Uum03po=
github.com/nats-io/nats.go v1.31.1-0.20231201130123-4af26aae2522/go.mod h1:uCwt8khnwboRrH1RbNzJh9C/GEnXnnwkcB/bUoz8eJs=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand All @@ -45,10 +53,20 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
22 changes: 12 additions & 10 deletions pkg/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jetstream
import (
"context"
"fmt"
"sync"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand Down Expand Up @@ -116,29 +117,26 @@ func createOrUpdateConsumerWithCloser(ctx context.Context,
}, err
}

// consume manages callback-based consumption of data from the provided JetStream consumer

// consume manages callback-based consumption of data from the provided JetStream consumer
func consume(ctx context.Context,
closing chan struct{},
s *Subscriber,
consumer jetstream.Consumer,
pullConsumeOpts []jetstream.PullConsumeOpt,
cb handleFunc,
deferred func(),
) (chan *message.Message, error) {
output := make(chan *message.Message)

// TODO: this is the closest analog to callback based subscriptions in watermill-nats/pkg/nats
// add support for batching pull consumers using consumer.Fetch / FetchNoWait
cc, err := consumer.Consume(func(msg jetstream.Msg) {
cb(ctx, msg, output)
}, pullConsumeOpts...)
s.handleMsg(ctx, msg, output)
}, s.consumeOptions...)
if err != nil {
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
}

go monitor(ctx, closing, output, func() {
defer deferred()
cc.Stop()
})
go monitor(ctx, s.closing, output, cc, s.messagesWG, deferred)

return output, nil
}
Expand All @@ -147,13 +145,17 @@ func consume(ctx context.Context,
func monitor(ctx context.Context,
closing chan struct{},
output chan *message.Message,
consumeContext jetstream.ConsumeContext,
messageWg *sync.WaitGroup,
after func()) {
defer after()
select {
case <-closing:
//unblock
case <-ctx.Done():
//unblock
}
consumeContext.Drain()
messageWg.Wait()
close(output)
after()
}
5 changes: 5 additions & 0 deletions pkg/jetstream/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (s *Subscriber) handleMsg(ctx context.Context, msg jetstream.Msg, output ch
return
}

s.messagesLock.Lock()
s.messagesWG.Add(1)
s.messagesLock.Unlock()
defer s.messagesWG.Done()

select {
case <-s.closing:
s.logger.Trace("Closing, message discarded", messageLogFields)
Expand Down
29 changes: 20 additions & 9 deletions pkg/jetstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ type Subscriber struct {
closed bool
closing chan struct{}
ackWait time.Duration
outputsWg *sync.WaitGroup
outputsWG *sync.WaitGroup
messagesWG *sync.WaitGroup
messagesLock *sync.RWMutex
closeTimeout time.Duration
subsLock *sync.RWMutex
consumerBuilder ResourceInitializer
Expand Down Expand Up @@ -65,7 +67,9 @@ func newSubscriber(nc *nats.Conn, config *SubscriberConfig) (*Subscriber, error)
closing: make(chan struct{}),
logger: config.Logger,
ackWait: config.AckWaitTimeout,
outputsWg: &sync.WaitGroup{},
outputsWG: &sync.WaitGroup{},
messagesWG: &sync.WaitGroup{},
messagesLock: &sync.RWMutex{},
closeTimeout: 5 * time.Second,
subsLock: &sync.RWMutex{},
consumerBuilder: config.ResourceInitializer,
Expand Down Expand Up @@ -95,16 +99,16 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
if closer != nil {
defer closer(ctx, s.logger)
}
s.outputsWg.Done()
s.outputsWG.Done()
}

if err != nil {
return nil, fmt.Errorf("failed to initialize jetstream consumer: %w", err)
}

s.outputsWg.Add(1)
s.outputsWG.Add(1)

return consume(ctx, s.closing, consumer, s.consumeOptions, s.handleMsg, cleanup)
return consume(ctx, s, consumer, cleanup)
}

// Close closes the subscriber and signals to close any subscriptions it created along with the underlying connection.
Expand All @@ -116,15 +120,22 @@ func (s *Subscriber) Close() error {
return nil
}
s.closed = true
/*
s.messagesLock.Lock()
s.messagesWG.Wait()
s.messagesLock.Unlock()
*/

close(s.closing)

// TODO: if we support shared connections don't always close
if err := s.nc.Drain(); err != nil {
return fmt.Errorf("failed to drain connection: %w", err)
}
/*
if err := s.nc.Drain(); err != nil {
return fmt.Errorf("failed to drain connection: %w", err)
}
*/

if watermillSync.WaitGroupTimeout(s.outputsWg, s.closeTimeout) {
if watermillSync.WaitGroupTimeout(s.outputsWG, s.closeTimeout) {
return fmt.Errorf("output wait group did not finish within alloted %s", s.closeTimeout.String())
}

Expand Down