@@ -11,9 +11,9 @@ import (
1111// PartitionWaterMarks represents a map of PartitionWaterMarks grouped by PartitionID
1212type PartitionWaterMarks = map [int32 ]kafka.PartitionWaterMark
1313
14- // OffsetStorage stores the latest committed offsets for each group, topic, partition combination and offers an interface
14+ // MemoryStorage stores the latest committed offsets for each group, topic, partition combination and offers an interface
1515// to access these information
16- type OffsetStorage struct {
16+ type MemoryStorage struct {
1717 logger * log.Entry
1818
1919 // Channels for receiving storage requests
@@ -66,8 +66,8 @@ type ConsumerPartitionOffsetMetric struct {
6666 TotalCommitCount float64
6767}
6868
69- // NewOffsetStorage creates a new storage and preinitializes the required maps which store the PartitionOffset information
70- func NewOffsetStorage (consumerOffsetCh <- chan * kafka.StorageRequest , clusterCh <- chan * kafka.StorageRequest ) * OffsetStorage {
69+ // NewMemoryStorage creates a new storage and preinitializes the required maps which store the PartitionOffset information
70+ func NewMemoryStorage (consumerOffsetCh <- chan * kafka.StorageRequest , clusterCh <- chan * kafka.StorageRequest ) * MemoryStorage {
7171 groups := & consumerGroup {
7272 Offsets : make (map [string ]ConsumerPartitionOffsetMetric ),
7373 Metadata : make (map [string ]kafka.ConsumerGroupMetadata ),
@@ -87,7 +87,7 @@ func NewOffsetStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <
8787 Configs : make (map [string ]kafka.TopicConfiguration ),
8888 }
8989
90- return & OffsetStorage {
90+ return & MemoryStorage {
9191 logger : log .WithFields (log.Fields {
9292 "module" : "storage" ,
9393 }),
@@ -103,12 +103,12 @@ func NewOffsetStorage(consumerOffsetCh <-chan *kafka.StorageRequest, clusterCh <
103103}
104104
105105// Start starts listening for incoming offset entries on the input channel so that they can be stored
106- func (module * OffsetStorage ) Start () {
106+ func (module * MemoryStorage ) Start () {
107107 go module .consumerOffsetWorker ()
108108 go module .clusterWorker ()
109109}
110110
111- func (module * OffsetStorage ) consumerOffsetWorker () {
111+ func (module * MemoryStorage ) consumerOffsetWorker () {
112112 for request := range module .consumerOffsetCh {
113113 switch request .RequestType {
114114 case kafka .StorageAddConsumerOffset :
@@ -132,7 +132,7 @@ func (module *OffsetStorage) consumerOffsetWorker() {
132132 log .Panic ("Group offset storage channel closed" )
133133}
134134
135- func (module * OffsetStorage ) clusterWorker () {
135+ func (module * MemoryStorage ) clusterWorker () {
136136 for request := range module .clusterCh {
137137 switch request .RequestType {
138138 case kafka .StorageAddPartitionLowWaterMark :
@@ -154,7 +154,7 @@ func (module *OffsetStorage) clusterWorker() {
154154 log .Panic ("Partition Offset storage channel closed" )
155155}
156156
157- func (module * OffsetStorage ) deleteTopic (topicName string ) {
157+ func (module * MemoryStorage ) deleteTopic (topicName string ) {
158158 module .topics .ConfigsLock .Lock ()
159159 module .partitions .LowWaterMarksLock .Lock ()
160160 module .partitions .HighWaterMarksLock .Lock ()
@@ -167,7 +167,7 @@ func (module *OffsetStorage) deleteTopic(topicName string) {
167167 delete (module .topics .Configs , topicName )
168168}
169169
170- func (module * OffsetStorage ) storePartitionHighWaterMark (offset * kafka.PartitionWaterMark ) {
170+ func (module * MemoryStorage ) storePartitionHighWaterMark (offset * kafka.PartitionWaterMark ) {
171171 module .partitions .HighWaterMarksLock .Lock ()
172172 defer module .partitions .HighWaterMarksLock .Unlock ()
173173
@@ -179,7 +179,7 @@ func (module *OffsetStorage) storePartitionHighWaterMark(offset *kafka.Partition
179179 module .partitions .HighWaterMarks [offset .TopicName ][offset .PartitionID ] = * offset
180180}
181181
182- func (module * OffsetStorage ) storePartitionLowWaterMark (offset * kafka.PartitionWaterMark ) {
182+ func (module * MemoryStorage ) storePartitionLowWaterMark (offset * kafka.PartitionWaterMark ) {
183183 module .partitions .LowWaterMarksLock .Lock ()
184184 defer module .partitions .LowWaterMarksLock .Unlock ()
185185
@@ -191,29 +191,29 @@ func (module *OffsetStorage) storePartitionLowWaterMark(offset *kafka.PartitionW
191191 module .partitions .LowWaterMarks [offset .TopicName ][offset .PartitionID ] = * offset
192192}
193193
194- func (module * OffsetStorage ) storeGroupMetadata (metadata * kafka.ConsumerGroupMetadata ) {
194+ func (module * MemoryStorage ) storeGroupMetadata (metadata * kafka.ConsumerGroupMetadata ) {
195195 module .groups .MetadataLock .Lock ()
196196 defer module .groups .MetadataLock .Unlock ()
197197
198198 module .groups .Metadata [metadata .Group ] = * metadata
199199}
200200
201- func (module * OffsetStorage ) storeTopicConfig (config * kafka.TopicConfiguration ) {
201+ func (module * MemoryStorage ) storeTopicConfig (config * kafka.TopicConfiguration ) {
202202 module .topics .ConfigsLock .Lock ()
203203 defer module .topics .ConfigsLock .Unlock ()
204204
205205 module .topics .Configs [config .TopicName ] = * config
206206}
207207
208- func (module * OffsetStorage ) registerOffsetPartitions (partitionCount int ) {
208+ func (module * MemoryStorage ) registerOffsetPartitions (partitionCount int ) {
209209 module .status .Lock .Lock ()
210210 defer module .status .Lock .Unlock ()
211211
212212 module .logger .Infof ("Registered %v __consumer_offsets partitions which have to be consumed before metrics can be exposed" , partitionCount )
213213 module .status .NotReadyPartitionConsumers = partitionCount
214214}
215215
216- func (module * OffsetStorage ) markOffsetPartitionReady (partitionID int32 ) {
216+ func (module * MemoryStorage ) markOffsetPartitionReady (partitionID int32 ) {
217217 module .status .Lock .Lock ()
218218 defer module .status .Lock .Unlock ()
219219
@@ -224,7 +224,7 @@ func (module *OffsetStorage) markOffsetPartitionReady(partitionID int32) {
224224 }
225225}
226226
227- func (module * OffsetStorage ) storeOffsetEntry (offset * kafka.ConsumerPartitionOffset ) {
227+ func (module * MemoryStorage ) storeOffsetEntry (offset * kafka.ConsumerPartitionOffset ) {
228228 module .groups .OffsetsLock .Lock ()
229229 defer module .groups .OffsetsLock .Unlock ()
230230
@@ -244,7 +244,7 @@ func (module *OffsetStorage) storeOffsetEntry(offset *kafka.ConsumerPartitionOff
244244 }
245245}
246246
247- func (module * OffsetStorage ) deleteOffsetEntry (consumerGroupName string , topicName string , partitionID int32 ) {
247+ func (module * MemoryStorage ) deleteOffsetEntry (consumerGroupName string , topicName string , partitionID int32 ) {
248248 key := fmt .Sprintf ("%v:%v:%v" , consumerGroupName , topicName , partitionID )
249249 module .groups .OffsetsLock .Lock ()
250250 defer module .groups .OffsetsLock .Unlock ()
@@ -254,7 +254,7 @@ func (module *OffsetStorage) deleteOffsetEntry(consumerGroupName string, topicNa
254254
255255// ConsumerOffsets returns a copy of the currently known consumer group offsets, so that they can safely be processed
256256// in another go routine
257- func (module * OffsetStorage ) ConsumerOffsets () map [string ]ConsumerPartitionOffsetMetric {
257+ func (module * MemoryStorage ) ConsumerOffsets () map [string ]ConsumerPartitionOffsetMetric {
258258 module .groups .OffsetsLock .RLock ()
259259 defer module .groups .OffsetsLock .RUnlock ()
260260
@@ -267,7 +267,7 @@ func (module *OffsetStorage) ConsumerOffsets() map[string]ConsumerPartitionOffse
267267}
268268
269269// GroupMetadata returns a copy of the currently known group metadata
270- func (module * OffsetStorage ) GroupMetadata () map [string ]kafka.ConsumerGroupMetadata {
270+ func (module * MemoryStorage ) GroupMetadata () map [string ]kafka.ConsumerGroupMetadata {
271271 module .groups .MetadataLock .RLock ()
272272 defer module .groups .MetadataLock .RUnlock ()
273273
@@ -281,7 +281,7 @@ func (module *OffsetStorage) GroupMetadata() map[string]kafka.ConsumerGroupMetad
281281
282282// TopicConfigs returns all topic configurations in a copied map, so that it
283283// is safe to process in another go routine
284- func (module * OffsetStorage ) TopicConfigs () map [string ]kafka.TopicConfiguration {
284+ func (module * MemoryStorage ) TopicConfigs () map [string ]kafka.TopicConfiguration {
285285 module .topics .ConfigsLock .RLock ()
286286 defer module .topics .ConfigsLock .RUnlock ()
287287
@@ -295,7 +295,7 @@ func (module *OffsetStorage) TopicConfigs() map[string]kafka.TopicConfiguration
295295
296296// PartitionHighWaterMarks returns all partition high water marks in a copied map, so that it
297297// is safe to process in another go routine
298- func (module * OffsetStorage ) PartitionHighWaterMarks () map [string ]PartitionWaterMarks {
298+ func (module * MemoryStorage ) PartitionHighWaterMarks () map [string ]PartitionWaterMarks {
299299 module .partitions .HighWaterMarksLock .RLock ()
300300 defer module .partitions .HighWaterMarksLock .RUnlock ()
301301
@@ -309,7 +309,7 @@ func (module *OffsetStorage) PartitionHighWaterMarks() map[string]PartitionWater
309309
310310// PartitionLowWaterMarks returns all partition low water marks in a copied map, so that it
311311// is safe to process in another go routine
312- func (module * OffsetStorage ) PartitionLowWaterMarks () map [string ]PartitionWaterMarks {
312+ func (module * MemoryStorage ) PartitionLowWaterMarks () map [string ]PartitionWaterMarks {
313313 module .partitions .LowWaterMarksLock .RLock ()
314314 defer module .partitions .LowWaterMarksLock .RUnlock ()
315315
@@ -323,7 +323,7 @@ func (module *OffsetStorage) PartitionLowWaterMarks() map[string]PartitionWaterM
323323
324324// IsConsumed indicates whether the consumer offsets topic lag has been caught up and therefore
325325// the metrics reported by this module are accurate or not
326- func (module * OffsetStorage ) IsConsumed () bool {
326+ func (module * MemoryStorage ) IsConsumed () bool {
327327 module .status .Lock .RLock ()
328328 defer module .status .Lock .RUnlock ()
329329
0 commit comments