From 1b4279cdc8852ac1c86241a8d9fe8573d520541b Mon Sep 17 00:00:00 2001 From: Edoardo Morandi Date: Mon, 24 Apr 2023 15:32:38 +0200 Subject: [PATCH 1/3] feat: allow custom StreamSource heartbeat Instead of using the default heartbeat for stream sources and mirror, allow more customizability. The mechanism of the health check keeps working in a linear fashion. Every time the heartbeat is not specified, things work just like before. --- server/stream.go | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/server/stream.go b/server/stream.go index f7b8f203ce0..123b41d17a2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -189,6 +189,7 @@ type StreamSource struct { SubjectTransformDest string `json:"subject_transform_dest,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` External *ExternalStream `json:"external,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` // Internal iname string // For indexing when stream names are the same for multiple sources. @@ -2081,7 +2082,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) // Signal the caller that we have captured the above fields. ready.Done() - t := time.NewTicker(sourceHealthCheckInterval) + t := time.NewTicker(mset.getMirrorHeartbeat()) defer t.Stop() for { @@ -2103,7 +2104,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) case <-t.C: mset.mu.RLock() isLeader := mset.isLeader() - stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval + stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*mset.getMirrorHeartbeat() mset.mu.RUnlock() // No longer leader. if !isLeader { @@ -2120,6 +2121,14 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) } } +func (mset *stream) getMirrorHeartbeat() time.Duration { + if mset.cfg.Mirror.Heartbeat == 0 { + return sourceHealthCheckInterval + } else { + return mset.cfg.Mirror.Heartbeat + } +} + // Checks that the message is from our current direct consumer. We can not depend on sub comparison // since cross account imports break. func (si *sourceInfo) isCurrentSub(reply string) bool { @@ -2426,7 +2435,7 @@ func (mset *stream) setupMirrorConsumer() error { AckPolicy: AckNone, AckWait: 22 * time.Hour, MaxDeliver: 1, - Heartbeat: sourceHealthCheckInterval, + Heartbeat: mset.getMirrorHeartbeat(), FlowControl: true, Direct: true, }, @@ -2761,7 +2770,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T AckPolicy: AckNone, AckWait: 22 * time.Hour, MaxDeliver: 1, - Heartbeat: sourceHealthCheckInterval, + Heartbeat: ssi.getStreamSourceHeartbeat(), FlowControl: true, Direct: true, }, @@ -2942,12 +2951,13 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) { msgs, qch, siqch, iname := si.msgs, mset.qch, si.qch, si.iname // Set the last seen as now so that we don't fail at the first check. si.last = time.Now() + heartbeat := mset.getSourceHeartbeat(iname) mset.mu.Unlock() // Signal the caller that we have captured the above fields. ready.Done() - t := time.NewTicker(sourceHealthCheckInterval) + t := time.NewTicker(heartbeat) defer t.Stop() for { @@ -2969,7 +2979,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) { case <-t.C: mset.mu.RLock() isLeader := mset.isLeader() - stalled := time.Since(si.last) > 3*sourceHealthCheckInterval + stalled := time.Since(si.last) > 3*heartbeat mset.mu.RUnlock() // No longer leader. if !isLeader { @@ -5647,3 +5657,20 @@ func (mset *stream) clearMonitorRunning() { defer mset.mu.Unlock() mset.inMonitor = false } + +func (mset *stream) getSourceHeartbeat(iname string) time.Duration { + ssi := mset.streamSource(iname) + if ssi == nil { + return sourceHealthCheckInterval + } + + return ssi.getStreamSourceHeartbeat() +} + +func (ssi *StreamSource) getStreamSourceHeartbeat() time.Duration { + if ssi.Heartbeat == 0 { + return sourceHealthCheckInterval + } else { + return ssi.Heartbeat + } +} From f7c9588f8285b5a41cf547d21b7512c7d836730d Mon Sep 17 00:00:00 2001 From: Edoardo Morandi Date: Wed, 5 Jul 2023 14:26:39 +0200 Subject: [PATCH 2/3] fix: (SQUASHME) data race --- server/stream.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index 123b41d17a2..f5b0927de3d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2077,12 +2077,13 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) msgs, qch, siqch := mirror.msgs, mset.qch, mirror.qch // Set the last seen as now so that we don't fail at the first check. mirror.last = time.Now() + heartbeat := mset.getMirrorHeartbeat() mset.mu.Unlock() // Signal the caller that we have captured the above fields. ready.Done() - t := time.NewTicker(mset.getMirrorHeartbeat()) + t := time.NewTicker(heartbeat) defer t.Stop() for { @@ -2104,7 +2105,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) case <-t.C: mset.mu.RLock() isLeader := mset.isLeader() - stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*mset.getMirrorHeartbeat() + stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*heartbeat mset.mu.RUnlock() // No longer leader. if !isLeader { From 02a59d97a55476f7f226e56bbfb537d4dca6f530 Mon Sep 17 00:00:00 2001 From: Edoardo Morandi Date: Mon, 10 Jul 2023 15:23:58 +0200 Subject: [PATCH 3/3] REMOVE ME: use external dependency for nats.go --- go.mod | 2 ++ go.sum | 2 ++ 2 files changed, 4 insertions(+) diff --git a/go.mod b/go.mod index 5247347502d..b7e292fe7ed 100644 --- a/go.mod +++ b/go.mod @@ -19,3 +19,5 @@ require ( github.com/golang/protobuf v1.4.2 // indirect google.golang.org/protobuf v1.23.0 // indirect ) + +replace github.com/nats-io/nats.go v1.28.0 => github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat diff --git a/go.sum b/go.sum index c72d2e7ee50..93737dbfb1a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat h1:1RTa1e65JlQqnvcDaFFCVwDSE4Cy1S3+b5qkz/OdQvM= +github.com/M4SS-Code/nats.go v1.28.1-stream-source-heartbeat/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=