-
Notifications
You must be signed in to change notification settings - Fork 139
ConsumerGroup.Messages() #111
Description
i create lots of ConsumerGroup with JoinConsumerGroup, and fetch msg from kafka by
for event := range consumer.Messages() {
// Process event
log.Println(string(event.Value))
eventCount += 1
// Ack event
consumer.CommitUpto(event)
}
when i run my program, consumer.Messages() return nil. the fowllowing my code
`type KafkaConsumer struct {
Topic string
Kafka //kafka configure
GroupConsumer *consumergroup.ConsumerGroup
Msgs chan *sarama.ConsumerMessage //提供给外部程序使用
Exit chan bool
}
func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {
config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)
groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
return nil, err
}
msgchan := make(chan *sarama.ConsumerMessage, 30)
ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil
}
func (k *KafkaConsumer) Close() error {
if k.GroupConsumer != nil {
if err := k.GroupConsumer.Close(); err != nil {
log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n",
err.Error(), k.String())
return err
} else {
log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String())
}
}
k.Exit <- true
return nil
}
func (k *KafkaConsumer) String() string {
return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(),
k.Topic, k.GroupConsumer, k.Msgs)
}
func (k *KafkaConsumer) Dispatcher(mylog *Log) {
ticker := time.NewTicker(time.Second * 10)
log.Printf("topic:%s start a dispatcher\n", k.Topic)
for {
select {
case msg := <-k.GroupConsumer.Messages():
if msg != nil && msg.Value != nil && len(msg.Value) > 0 {
k.Msgs <- msg
k.GroupConsumer.CommitUpto(msg)
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan",
k.Alltopic, msg.Topic, len(msg.Value))
} else {
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v",
k.Alltopic, k.Topic, msg)
}
case err := <-k.GroupConsumer.Errors():
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err)
case <-k.Exit:
log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic)
ticker.Stop()
return
case <-ticker.C:
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d",
k.Alltopic, k.Topic, k)
}
}
}`
how to fix it?
best wishes.