@@ -88,7 +88,6 @@ func (consumer *TrieTreeConsumer) Cleanup(sarama.ConsumerGroupSession) error {
8888// ConsumeClaim 必须启动 ConsumerGroupClaim 的 Messages() 消费者循环。
8989// 一旦 Messages() 通道关闭,处理程序必须完成其处理循环并退出。
9090func (consumer * TrieTreeConsumer ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
91- // ctx := context.Background()
9291 gapTime := 2 * time .Minute
9392 for {
9493 select {
@@ -98,14 +97,12 @@ func (consumer *TrieTreeConsumer) ConsumeClaim(session sarama.ConsumerGroupSessi
9897 return nil
9998 }
10099 // 构建trie tree树
101- trie .GobalTrieTree .Insert (string (message .Value ))
102- // logs.LogrusObj.Infof("TrieTreeConsumer Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
100+ trie .GlobalTrieTree .Insert (string (message .Value ))
103101 session .MarkMessage (message , "" )
104- // https://github.com/IBM/sarama/issues/1192
105102
106103 case <- time .After (gapTime ):
107104 logs .LogrusObj .Infof ("ConsumeClaim starting store dict" )
108- // _ = storage.GlobalTrieDBs.StorageDict(trie.GobalTrieTree ) // TODO:后续看看能不能实现一个全局的triedb,每次都先读取存量进行初始化,再插入增量...
105+ // _ = storage.GlobalTrieDBs.StorageDict(trie.GlobalTrieTree ) // TODO:后续看看能不能实现一个全局的triedb,每次都先读取存量进行初始化,再插入增量...
109106 logs .LogrusObj .Infof ("ConsumeClaim ending store dict" )
110107
111108 case <- session .Context ().Done ():
@@ -116,12 +113,12 @@ func (consumer *TrieTreeConsumer) ConsumeClaim(session sarama.ConsumerGroupSessi
116113}
117114
118115// func mergeTrieTree(node string) {
119- // trie.GobalTrieTree .Insert(node)
116+ // trie.GlobalTrieTree .Insert(node)
120117// gapTime := 2 * time.Minute
121118// for {
122119// select {
123120// case <-time.After(gapTime):
124- // _ = storage.GlobalTrieDBs.StorageDict(trie.GobalTrieTree )
121+ // _ = storage.GlobalTrieDBs.StorageDict(trie.GlobalTrieTree )
125122// }
126123// }
127124// }
0 commit comments