Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ type statzDescs struct {
accJszConsumerNumPending *prometheus.Desc
accJszConsumerAckFloorStreamSeq *prometheus.Desc
accJszConsumerAckFloorConsumerSeq *prometheus.Desc
accJszConsumerAckFloorLastActive *prometheus.Desc
accJszConsumerDeliveredLastActive *prometheus.Desc
}

// gatewayzDescs holds the gateway metric descriptions
Expand Down Expand Up @@ -246,6 +248,8 @@ type consumerStats struct {
consumerAckFloorStreamSeq float64
consumerAckFloorConsumerSeq float64
consumerRaftGroup string
consumerAckFloorLastActive float64
consumerDeliveredLastActive float64
}

type streamAccountStats struct {
Expand Down Expand Up @@ -525,6 +529,18 @@ func (sc *StatzCollector) buildDescs() {
consumerLabels,
nil,
)
sc.descs.accJszConsumerAckFloorLastActive = prometheus.NewDesc(
prometheus.BuildFQName("nats", "consumer", "ack_floor_last_active"),
"Unix timestamp of last ack floor activity from a consumer",
consumerLabels,
nil,
)
sc.descs.accJszConsumerDeliveredLastActive = prometheus.NewDesc(
prometheus.BuildFQName("nats", "consumer", "delivered_last_active"),
"Unix timestamp of last delivered message activity from a consumer",
consumerLabels,
nil,
)
}

// Surveyor
Expand Down Expand Up @@ -917,6 +933,16 @@ func (sc *StatzCollector) pollAccountInfo() error {
}
}
}

// Extract timestamp values, converting to Unix timestamp or 0 if nil
var ackFloorLastActive, deliveredLastActive float64
if consumer.AckFloor.Last != nil {
ackFloorLastActive = float64(consumer.AckFloor.Last.UnixMilli())
}
if consumer.Delivered.Last != nil {
deliveredLastActive = float64(consumer.Delivered.Last.UnixMilli())
}

cs := consumerStats{
consumerName: consumer.Name,
consumerLeader: consumerLeader,
Expand All @@ -929,6 +955,8 @@ func (sc *StatzCollector) pollAccountInfo() error {
consumerNumPending: float64(consumer.NumPending),
consumerAckFloorStreamSeq: float64(consumer.AckFloor.Stream),
consumerAckFloorConsumerSeq: float64(consumer.AckFloor.Consumer),
consumerAckFloorLastActive: ackFloorLastActive,
consumerDeliveredLastActive: deliveredLastActive,
}
sas.consumerStats = append(sas.consumerStats, &cs)
}
Expand Down Expand Up @@ -1300,6 +1328,8 @@ func (sc *StatzCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- sc.descs.accJszConsumerNumPending
ch <- sc.descs.accJszConsumerAckFloorStreamSeq
ch <- sc.descs.accJszConsumerAckFloorConsumerSeq
ch <- sc.descs.accJszConsumerAckFloorLastActive
ch <- sc.descs.accJszConsumerDeliveredLastActive
}
}

Expand Down Expand Up @@ -1639,6 +1669,24 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
),
)
}
if sc.jszFilterSet[ConsumerAckFloorLastActive] || !hasFilters {
metrics.newGaugeMetric(sc.descs.accJszConsumerAckFloorLastActive,
consumerStat.consumerAckFloorLastActive,
append(accLabels, streamStat.accountName,
streamStat.clusterName, raftGroup, streamStat.serverID, streamStat.serverName,
streamStat.streamName, streamStat.streamLeader, consumerStat.consumerName, consumerStat.consumerLeader,
),
)
}
if sc.jszFilterSet[ConsumerDeliveredLastActive] || !hasFilters {
metrics.newGaugeMetric(sc.descs.accJszConsumerDeliveredLastActive,
consumerStat.consumerDeliveredLastActive,
append(accLabels, streamStat.accountName,
streamStat.clusterName, raftGroup, streamStat.serverID, streamStat.serverName,
streamStat.streamName, streamStat.streamLeader, consumerStat.consumerName, consumerStat.consumerLeader,
),
)
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions surveyor/jsz_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
ConsumerNumPending
ConsumerNumRedelivered
ConsumerNumWaiting
ConsumerAckFloorLastActive
ConsumerDeliveredLastActive
)

// JszFilterIds maps enum values to their string representations
Expand All @@ -50,6 +52,8 @@ var JszFilterIds = map[JszFilter][]string{
ConsumerNumPending: {"consumer_num_pending"},
ConsumerNumRedelivered: {"consumer_num_redelivered"},
ConsumerNumWaiting: {"consumer_num_waiting"},
ConsumerAckFloorLastActive: {"consumer_ack_floor_last_active"},
ConsumerDeliveredLastActive: {"consumer_delivered_last_active"},
}

// JszFiltersToStringSlice converts a slice of JszFilter enums to their string representations
Expand Down
6 changes: 5 additions & 1 deletion surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,10 @@ func TestSurveyor_AccountJetStreamJszLeaderOnly(t *testing.T) {
regexp.MustCompile(`nats_stream_total_messages`),
regexp.MustCompile(`nats_consumer_ack_floor_consumer_seq`),
regexp.MustCompile(`nats_consumer_ack_floor_stream_seq`),
regexp.MustCompile(`nats_consumer_ack_floor_last_active`),
regexp.MustCompile(`nats_consumer_delivered_consumer_seq`),
regexp.MustCompile(`nats_consumer_delivered_stream_seq`),
regexp.MustCompile(`nats_consumer_delivered_last_active`),
regexp.MustCompile(`nats_consumer_num_ack_pending`),
regexp.MustCompile(`nats_consumer_num_pending`),
regexp.MustCompile(`nats_consumer_num_redelivered`),
Expand Down Expand Up @@ -984,7 +986,7 @@ func TestSurveyor_AccountJetStreamJszLeaderOnly(t *testing.T) {
totalConsumers++
}
}
expectedConsumerMetrics := 120
expectedConsumerMetrics := 150
if totalConsumers != expectedConsumerMetrics {
t.Errorf("Expected %v, got %v", expectedConsumerMetrics, totalConsumers)
}
Expand Down Expand Up @@ -1076,8 +1078,10 @@ func TestSurveyor_AccountJetStreamJszFilters(t *testing.T) {
notWanted := []*regexp.Regexp{
regexp.MustCompile(`nats_consumer_ack_floor_consumer_seq`),
regexp.MustCompile(`nats_consumer_ack_floor_stream_seq`),
regexp.MustCompile(`nats_consumer_ack_floor_last_active`),
regexp.MustCompile(`nats_consumer_delivered_consumer_seq`),
regexp.MustCompile(`nats_consumer_delivered_stream_seq`),
regexp.MustCompile(`nats_consumer_delivered_last_active`),
regexp.MustCompile(`nats_consumer_num_redelivered`),
regexp.MustCompile(`nats_consumer_num_waiting`),
}
Expand Down