Skip to content

Commit 5a4b3ec

Browse files
author
Martin Schneppenheim
committed
Only expose metrics if topic has been consumed
1 parent 8212c61 commit 5a4b3ec

File tree

5 files changed

+132
-3
lines changed

5 files changed

+132
-3
lines changed

collector/collector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ func (e *Collector) Describe(ch chan<- *prometheus.Desc) {
108108
// Collect is triggered by the Prometheus registry when the metrics endpoint has been invoked
109109
func (e *Collector) Collect(ch chan<- prometheus.Metric) {
110110
log.Debug("Collector's collect has been invoked")
111+
112+
if e.storage.IsConsumed() == false {
113+
log.Info("Offets topic has not yet been consumed until the end")
114+
return
115+
}
116+
111117
consumerOffsets := e.storage.ConsumerOffsets()
112118
partitionLowWaterMarks := e.storage.PartitionLowWaterMarks()
113119
partitionHighWaterMarks := e.storage.PartitionHighWaterMarks()

kafka/cluster.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ type PartitionWaterMark struct {
2929
Timestamp int64
3030
}
3131

32+
type consumerOffsetTopic struct {
33+
Lock sync.RWMutex
34+
PartitionsByID map[int32]consumerOffsetPartition
35+
}
36+
37+
type consumerOffsetPartition struct {
38+
PartitionID int32
39+
HighWaterMark int64
40+
}
41+
42+
var (
43+
offsetWaterMarks = consumerOffsetTopic{
44+
Lock: sync.RWMutex{},
45+
PartitionsByID: make(map[int32]consumerOffsetPartition),
46+
}
47+
)
48+
3249
// NewCluster creates a new cluster module and tries to connect to the kafka cluster
3350
// If it cannot connect to the cluster it will panic
3451
func NewCluster(opts *options.Options, partitionWaterMarksCh chan *StorageRequest) *Cluster {
@@ -190,9 +207,6 @@ func (module *Cluster) getHighWaterMarks(wg *sync.WaitGroup, broker *sarama.Brok
190207
}
191208
ts := time.Now().Unix() * 1000
192209
for topicName, responseBlock := range response.Blocks {
193-
if !module.isTopicAllowed(topicName) {
194-
continue
195-
}
196210

197211
for partitionID, offsetResponse := range responseBlock {
198212
if offsetResponse.Err != sarama.ErrNoError {
@@ -204,6 +218,19 @@ func (module *Cluster) getHighWaterMarks(wg *sync.WaitGroup, broker *sarama.Brok
204218
continue
205219
}
206220

221+
if topicName == module.options.ConsumerOffsetsTopicName {
222+
offsetWaterMarks.Lock.Lock()
223+
offsetWaterMarks.PartitionsByID[partitionID] = consumerOffsetPartition{
224+
PartitionID: partitionID,
225+
HighWaterMark: offsetResponse.Offsets[0],
226+
}
227+
offsetWaterMarks.Lock.Unlock()
228+
}
229+
230+
// Skip topic in this for loop (instead of the outer one) because we still need __consumer_offset information
231+
if !module.isTopicAllowed(topicName) {
232+
continue
233+
}
207234
logger.WithFields(log.Fields{
208235
"topic": topicName,
209236
"partition": partitionID,

kafka/offset_consumer.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,17 @@ import (
66
"github.com/Shopify/sarama"
77
"github.com/google-cloud-tools/kafka-minion/options"
88
log "github.com/sirupsen/logrus"
9+
"math"
910
"strings"
1011
"sync"
12+
"time"
1113
)
1214

15+
type consumerStatus struct {
16+
PartitionLag int64
17+
IsReady bool // Indicates whether a partition consumer has caught up the partition lag or not
18+
}
19+
1320
// OffsetConsumer is a consumer module which reads consumer group information from the offsets topic in a Kafka cluster.
1421
// The offsets topic is typically named __consumer_offsets. All messages in this topic are binary and therefore they
1522
// must first be decoded to access the information. This module consumes and processes all messages in the offsets topic.
@@ -81,6 +88,8 @@ func (module *OffsetConsumer) Start() {
8188
}).Infof("Starting '%d' partition consumers", len(partitions))
8289
for _, partition := range partitions {
8390
module.wg.Add(1)
91+
registerPartitionRequest := newRegisterOffsetPartition(partition)
92+
module.storageChannel <- registerPartitionRequest
8493
go module.partitionConsumer(consumer, partition)
8594
}
8695
log.WithFields(log.Fields{
@@ -105,18 +114,38 @@ func (module *OffsetConsumer) partitionConsumer(consumer sarama.Consumer, partit
105114
log.Debugf("Started consumer %d", partitionID)
106115
defer pconsumer.AsyncClose()
107116

117+
ticker := time.NewTicker(5 * time.Second)
118+
var consumedOffset int64
119+
108120
for {
109121
select {
110122
case msg := <-pconsumer.Messages():
111123
messagesInSuccess.WithLabelValues(msg.Topic).Add(1)
112124
module.processMessage(msg)
125+
consumedOffset = msg.Offset
113126
case err := <-pconsumer.Errors():
114127
messagesInFailed.WithLabelValues(err.Topic).Add(1)
115128
log.WithFields(log.Fields{
116129
"error": err.Error(),
117130
"topic": err.Topic,
118131
"partition": err.Partition,
119132
}).Errorf("partition consume error")
133+
case <-ticker.C:
134+
var highWaterMark int64
135+
highWaterMark = math.MaxInt64
136+
offsetWaterMarks.Lock.RLock()
137+
if val, exists := offsetWaterMarks.PartitionsByID[partitionID]; exists {
138+
// Not sure why -1 is needed here, but otherwise there are lots of partition consumers with a remaining lag of 1
139+
highWaterMark = val.HighWaterMark - 1
140+
}
141+
offsetWaterMarks.Lock.RUnlock()
142+
if consumedOffset >= highWaterMark {
143+
request := newMarkOffsetPartitionReady(partitionID)
144+
module.storageChannel <- request
145+
ticker.Stop()
146+
} else {
147+
log.Debugf("Not ready, Lag is: %v (consumed: %v)", highWaterMark-consumedOffset, consumedOffset)
148+
}
120149
}
121150
}
122151
}

kafka/storage_request.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ const (
1515

1616
// StorageDeleteConsumerGroup is the request type to remove an offset commit for a topic:group:partition combination
1717
StorageDeleteConsumerGroup StorageRequestType = 3
18+
19+
// StorageRegisterOffsetPartition is the request type to make the storage module aware that a partition consumer for
20+
// the consumer offsets partition exists and that it should await it's ready signal before exposing metrics
21+
StorageRegisterOffsetPartition StorageRequestType = 4
22+
23+
// StorageMarkOffsetPartitionReady is the request type to mark a partition consumer of the consumer offsets topic
24+
// as ready (=caught up partition lag)
25+
StorageMarkOffsetPartitionReady StorageRequestType = 5
1826
)
1927

2028
// StorageRequest is an entity to send requests to the storage module
@@ -56,3 +64,17 @@ func newDeleteConsumerGroupRequest(group string, topic string, partitionID int32
5664
PartitionID: partitionID,
5765
}
5866
}
67+
68+
func newRegisterOffsetPartition(partitionID int32) *StorageRequest {
69+
return &StorageRequest{
70+
RequestType: StorageRegisterOffsetPartition,
71+
PartitionID: partitionID,
72+
}
73+
}
74+
75+
func newMarkOffsetPartitionReady(partitionID int32) *StorageRequest {
76+
return &StorageRequest{
77+
RequestType: StorageMarkOffsetPartitionReady,
78+
PartitionID: partitionID,
79+
}
80+
}

storage/offset.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ type OffsetStorage struct {
1313
consumerOffsetCh chan *kafka.StorageRequest
1414
clusterCh chan *kafka.StorageRequest
1515

16+
notReadyPartitionConsumers int32
17+
offsetTopicConsumed bool
18+
19+
consumerStatusLock sync.RWMutex // Lock is being used if you either access notReadyPartitionConsumers or offsetTopicConsumed
1620
consumerOffsetsLock sync.RWMutex
1721
partitionHighWaterMarksLock sync.RWMutex
1822
partitionLowWaterMarksLock sync.RWMutex
@@ -22,6 +26,8 @@ type OffsetStorage struct {
2226
consumerOffsets map[string]ConsumerPartitionOffsetMetric
2327
partitionHighWaterMarks map[string]kafka.PartitionWaterMark
2428
partitionLowWaterMarks map[string]kafka.PartitionWaterMark
29+
30+
logger *log.Entry
2531
}
2632

2733
// ConsumerPartitionOffsetMetric represents an offset commit but is extended by further fields which can be
@@ -41,13 +47,20 @@ func NewOffsetStorage(consumerOffsetCh chan *kafka.StorageRequest, clusterCh cha
4147
consumerOffsetCh: consumerOffsetCh,
4248
clusterCh: clusterCh,
4349

50+
notReadyPartitionConsumers: 0,
51+
offsetTopicConsumed: false,
52+
4453
consumerOffsetsLock: sync.RWMutex{},
4554
partitionHighWaterMarksLock: sync.RWMutex{},
4655
partitionLowWaterMarksLock: sync.RWMutex{},
4756

4857
consumerOffsets: make(map[string]ConsumerPartitionOffsetMetric),
4958
partitionHighWaterMarks: make(map[string]kafka.PartitionWaterMark),
5059
partitionLowWaterMarks: make(map[string]kafka.PartitionWaterMark),
60+
61+
logger: log.WithFields(log.Fields{
62+
"module": "storage",
63+
}),
5164
}
5265
}
5366

@@ -64,6 +77,11 @@ func (module *OffsetStorage) consumerOffsetWorker() {
6477
module.storeOffsetEntry(request.ConsumerOffset)
6578
case kafka.StorageDeleteConsumerGroup:
6679
module.deleteOffsetEntry(request.ConsumerGroupName, request.TopicName, request.PartitionID)
80+
case kafka.StorageRegisterOffsetPartition:
81+
module.registerOffsetPartition(request.PartitionID)
82+
case kafka.StorageMarkOffsetPartitionReady:
83+
module.markOffsetPartitionReady(request.PartitionID)
84+
6785
default:
6886
log.WithFields(log.Fields{
6987
"request_type": request.RequestType,
@@ -98,6 +116,24 @@ func (module *OffsetStorage) storePartitionHighWaterMark(offset *kafka.Partition
98116
module.partitionHighWaterMarksLock.Unlock()
99117
}
100118

119+
func (module *OffsetStorage) registerOffsetPartition(partitionID int32) {
120+
module.consumerOffsetsLock.Lock()
121+
defer module.consumerOffsetsLock.Unlock()
122+
123+
module.notReadyPartitionConsumers++
124+
}
125+
126+
func (module *OffsetStorage) markOffsetPartitionReady(partitionID int32) {
127+
module.consumerOffsetsLock.Lock()
128+
defer module.consumerOffsetsLock.Unlock()
129+
130+
module.notReadyPartitionConsumers--
131+
if module.notReadyPartitionConsumers == 0 {
132+
module.logger.Info("Offset topic has been consumed")
133+
module.offsetTopicConsumed = true
134+
}
135+
}
136+
101137
func (module *OffsetStorage) storePartitionLowWaterMark(offset *kafka.PartitionWaterMark) {
102138
key := fmt.Sprintf("%v:%v", offset.TopicName, offset.PartitionID)
103139
module.partitionLowWaterMarksLock.Lock()
@@ -170,3 +206,12 @@ func (module *OffsetStorage) PartitionLowWaterMarks() map[string]kafka.Partition
170206

171207
return mapCopy
172208
}
209+
210+
// IsConsumed indicates whether the consumer offsets topic lag has been caught up and therefore
211+
// the metrics reported by this module are accurate or not
212+
func (module *OffsetStorage) IsConsumed() bool {
213+
module.consumerStatusLock.RLock()
214+
defer module.consumerStatusLock.RUnlock()
215+
216+
return module.offsetTopicConsumed
217+
}

0 commit comments

Comments
 (0)