Skip to content

Commit 7d2013d

Browse files
committed
feat: allow custom StreamSource heartbeat
Instead of using the default heartbeat for stream sources and mirror, allow more customizability. The mechanism of the health check keeps working in a linear fashion. Every time the heartbeat is not specified, things work just like before.
1 parent 20ce582 commit 7d2013d

File tree

1 file changed

+33
-6
lines changed

1 file changed

+33
-6
lines changed

server/stream.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ type StreamSource struct {
186186
FilterSubject string `json:"filter_subject,omitempty"`
187187
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
188188
External *ExternalStream `json:"external,omitempty"`
189+
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
189190

190191
// Internal
191192
iname string // For indexing when stream names are the same for multiple sources.
@@ -1898,7 +1899,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
18981899
// Signal the caller that we have captured the above fields.
18991900
ready.Done()
19001901

1901-
t := time.NewTicker(sourceHealthCheckInterval)
1902+
t := time.NewTicker(mset.getMirrorHeartbeat())
19021903
defer t.Stop()
19031904

19041905
for {
@@ -1920,7 +1921,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
19201921
case <-t.C:
19211922
mset.mu.RLock()
19221923
isLeader := mset.isLeader()
1923-
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
1924+
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*mset.getMirrorHeartbeat()
19241925
mset.mu.RUnlock()
19251926
// No longer leader.
19261927
if !isLeader {
@@ -1937,6 +1938,14 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup)
19371938
}
19381939
}
19391940

1941+
func (mset *stream) getMirrorHeartbeat() time.Duration {
1942+
if mset.cfg.Mirror.Heartbeat == 0 {
1943+
return sourceHealthCheckInterval
1944+
} else {
1945+
return mset.cfg.Mirror.Heartbeat
1946+
}
1947+
}
1948+
19401949
// Checks that the message is from our current direct consumer. We can not depend on sub comparison
19411950
// since cross account imports break.
19421951
func (si *sourceInfo) isCurrentSub(reply string) bool {
@@ -2226,7 +2235,7 @@ func (mset *stream) setupMirrorConsumer() error {
22262235
AckPolicy: AckNone,
22272236
AckWait: 22 * time.Hour,
22282237
MaxDeliver: 1,
2229-
Heartbeat: sourceHealthCheckInterval,
2238+
Heartbeat: mset.getMirrorHeartbeat(),
22302239
FlowControl: true,
22312240
Direct: true,
22322241
},
@@ -2537,7 +2546,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
25372546
AckPolicy: AckNone,
25382547
AckWait: 22 * time.Hour,
25392548
MaxDeliver: 1,
2540-
Heartbeat: sourceHealthCheckInterval,
2549+
Heartbeat: ssi.getStreamSourceHeartbeat(),
25412550
FlowControl: true,
25422551
Direct: true,
25432552
},
@@ -2712,12 +2721,13 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
27122721
msgs, qch, siqch, iname := si.msgs, mset.qch, si.qch, si.iname
27132722
// Set the last seen as now so that we don't fail at the first check.
27142723
si.last = time.Now()
2724+
heartbeat := mset.getSourceHeartbeat(iname)
27152725
mset.mu.Unlock()
27162726

27172727
// Signal the caller that we have captured the above fields.
27182728
ready.Done()
27192729

2720-
t := time.NewTicker(sourceHealthCheckInterval)
2730+
t := time.NewTicker(heartbeat)
27212731
defer t.Stop()
27222732

27232733
for {
@@ -2739,7 +2749,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) {
27392749
case <-t.C:
27402750
mset.mu.RLock()
27412751
isLeader := mset.isLeader()
2742-
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
2752+
stalled := time.Since(si.last) > 3*heartbeat
27432753
mset.mu.RUnlock()
27442754
// No longer leader.
27452755
if !isLeader {
@@ -5385,3 +5395,20 @@ func (mset *stream) clearMonitorRunning() {
53855395
defer mset.mu.Unlock()
53865396
mset.inMonitor = false
53875397
}
5398+
5399+
func (mset *stream) getSourceHeartbeat(iname string) time.Duration {
5400+
ssi := mset.streamSource(iname)
5401+
if ssi == nil {
5402+
return sourceHealthCheckInterval
5403+
}
5404+
5405+
return ssi.getStreamSourceHeartbeat()
5406+
}
5407+
5408+
func (ssi *StreamSource) getStreamSourceHeartbeat() time.Duration {
5409+
if ssi.Heartbeat == 0 {
5410+
return sourceHealthCheckInterval
5411+
} else {
5412+
return ssi.Heartbeat
5413+
}
5414+
}

0 commit comments

Comments
 (0)