Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ require (
github.com/golang/protobuf v1.4.2 // indirect
google.golang.org/protobuf v1.23.0 // indirect
)

replace github.com/nats-io/nats.go v1.28.0 => github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat h1:1RTa1e65JlQqnvcDaFFCVwDSE4Cy1S3+b5qkz/OdQvM=
github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
Expand Down
40 changes: 34 additions & 6 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type StreamSource struct {
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`

// Internal
iname string // For indexing when stream names are the same for multiple sources.
Expand Down Expand Up @@ -2076,12 +2077,13 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
msgs, qch, siqch := mirror.msgs, mset.qch, mirror.qch
// Set the last seen as now so that we don't fail at the first check.
mirror.last = time.Now()
heartbeat := mset.getMirrorHeartbeat()
mset.mu.Unlock()

// Signal the caller that we have captured the above fields.
ready.Done()

t := time.NewTicker(sourceHealthCheckInterval)
t := time.NewTicker(heartbeat)
defer t.Stop()

for {
Expand All @@ -2103,7 +2105,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
case <-t.C:
mset.mu.RLock()
isLeader := mset.isLeader()
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*heartbeat
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
Expand All @@ -2120,6 +2122,14 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
}
}

func (mset *stream) getMirrorHeartbeat() time.Duration {
if mset.cfg.Mirror.Heartbeat == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dodomorandi My guess is that the data race is in this method since no read lock is being used (unlike all of the other methods on mset when fields are accessed).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data race now is fixed, the only thing is that I keep using the initial value of the heartbeat. AFAIK this should not be an issue because I don't expect the value to change across locking blocks. Is my assumption correct or it would be better to use the current value of the heartbeat here?

return sourceHealthCheckInterval
} else {
return mset.cfg.Mirror.Heartbeat
}
}

// Checks that the message is from our current direct consumer. We can not depend on sub comparison
// since cross account imports break.
func (si *sourceInfo) isCurrentSub(reply string) bool {
Expand Down Expand Up @@ -2426,7 +2436,7 @@ func (mset *stream) setupMirrorConsumer() error {
AckPolicy: AckNone,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
Heartbeat: mset.getMirrorHeartbeat(),
FlowControl: true,
Direct: true,
},
Expand Down Expand Up @@ -2761,7 +2771,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
AckPolicy: AckNone,
AckWait: 22 * time.Hour,
MaxDeliver: 1,
Heartbeat: sourceHealthCheckInterval,
Heartbeat: ssi.getStreamSourceHeartbeat(),
FlowControl: true,
Direct: true,
},
Expand Down Expand Up @@ -2942,12 +2952,13 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
msgs, qch, siqch, iname := si.msgs, mset.qch, si.qch, si.iname
// Set the last seen as now so that we don't fail at the first check.
si.last = time.Now()
heartbeat := mset.getSourceHeartbeat(iname)
mset.mu.Unlock()

// Signal the caller that we have captured the above fields.
ready.Done()

t := time.NewTicker(sourceHealthCheckInterval)
t := time.NewTicker(heartbeat)
defer t.Stop()

for {
Expand All @@ -2969,7 +2980,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
case <-t.C:
mset.mu.RLock()
isLeader := mset.isLeader()
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
stalled := time.Since(si.last) > 3*heartbeat
mset.mu.RUnlock()
// No longer leader.
if !isLeader {
Expand Down Expand Up @@ -5647,3 +5658,20 @@ func (mset *stream) clearMonitorRunning() {
defer mset.mu.Unlock()
mset.inMonitor = false
}

func (mset *stream) getSourceHeartbeat(iname string) time.Duration {
ssi := mset.streamSource(iname)
if ssi == nil {
return sourceHealthCheckInterval
}

return ssi.getStreamSourceHeartbeat()
}

func (ssi *StreamSource) getStreamSourceHeartbeat() time.Duration {
if ssi.Heartbeat == 0 {
return sourceHealthCheckInterval
} else {
return ssi.Heartbeat
}
}