Skip to content
This repository was archived by the owner on May 13, 2019. It is now read-only.
This repository was archived by the owner on May 13, 2019. It is now read-only.

Kafka consumer not restarting after broker failure #124

@bidesh

Description

@bidesh

We had an issue with connection from our consumers to kafka brokers. The issue lasted for couple of hours. The log from this time is below:

2017-11-11 00:14:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:29 UTC | ERROR: kafka.go:518: Kafka consumer error: zk: could not connect to a server
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: Stopping partition consumer at offset 38214590710
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: Stopping partition consumer at offset 4394485987
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: Stopping partition consumer at offset 4397716347
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: Stopping partition consumer at offset 38266406728
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: Stopping partition consumer at offset 4380970705
2017-11-11 00:14:30 UTC | consumer/broker/8 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka8:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/0: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/2 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka2:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/3: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/1: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/1 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka1:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/2: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/3 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka3:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/4: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/0 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/1 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/3 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/2 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/4 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: FAILED to commit offset 4380970705 to Zookeeper. Last committed offset: 4380970443
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: FAILED to commit offset 38214590710 to Zookeeper. Last committed offset: 38214590466
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: FAILED to commit offset 4394485987 to Zookeeper. Last committed offset: 4394485910
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: FAILED to commit offset 38266406728 to Zookeeper. Last committed offset: 38266406473
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: FAILED to commit offset 4397716347 to Zookeeper. Last committed offset: 4397716273
2017-11-11 00:14:41 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:45 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:49 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:53 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:57 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:01 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | Closed connection to broker kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:05 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:09 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:15:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:32 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
.
.
. *trying to commit offset and failure to fetch metadata continues...*
.
.
2017-11-11 01:39:02 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 01:39:06 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:10 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:14 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 01:39:18 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:22 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:26 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 01:39:30 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 01:39:32 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 01:40:02 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 01:40:32 UTC | Failed to connect to broker kafka0:9092: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for all topics from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
.
.
. *no failure messages to commit offset anymore. but the brokers are still away*
.
.
2017-11-11 04:55:08 UTC | client/metadata retrying after 250ms... (2 attempts remaining)
2017-11-11 04:55:09 UTC | client/metadata fetching metadata for all topics from broker kafka4:9092
2017-11-11 04:55:39 UTC | Failed to connect to broker kafka4:9092: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata fetching metadata for all topics from broker kafka3:9092
2017-11-11 04:56:09 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata fetching metadata for all topics from broker kafka10:9092
2017-11-11 04:56:39 UTC | Failed to connect to broker kafka10:9092: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata fetching metadata for all topics from broker kafka7:9092
2017-11-11 04:57:09 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 04:57:39 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 04:58:09 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 04:58:09 UTC | Connected to broker at kafka0:9092 (unregistered)
2017-11-11 04:58:09 UTC | client/brokers registered new broker #0 at kafka0:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #10 at kafka10:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #1 at kafka1:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #9 at kafka9:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #2 at kafka2:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #7 at kafka7:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #3 at kafka3:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #8 at kafka8:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #4 at kafka4:9092
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092

After registering brokers, we were still not consuming messages. Also, after a while even though the network problem persists, saving the offset is not tried anymore and there are no error messages. Which is a bit strange to me.
And when the brokers are registered, there is no partition rebalancing or anything like that in the logs.

The way I have implemented is:

func (cons *KafkaConsumer) restartGroupWithZooKeeper() {
	time.Sleep(cons.persistTimeout)
	cons.readFromGroupWithZookeeper()
}

func (cons *KafkaConsumer) readFromGroupWithZookeeper() {
	cons.AddWorker()

	restartWithNewClient := false
	defer func() {
		// close the existing group client otherwise
		// there will be a stale client who had partition but is not reading.
		if (cons.consumerGroupClient != nil) && (!cons.consumerGroupClient.Closed()) {
			cons.consumerGroupClient.Close()
		}
		cons.WorkerDone()

		if restartWithNewClient {
			// mutual recursive function. Will call this function again.
			// so it is important to close the existing one above
			cons.restartGroupWithZooKeeper()
		}
	}()

	client, err := kzconsumergroup.JoinConsumerGroup(cons.group, []string{cons.topic}, cons.zookeeper, cons.consumerGroupConfig)
	cons.consumerGroupClient = client
	if err != nil {
		restartWithNewClient = true
		Log.Error.Printf("Restarting kafka consumer %s:%s - %s", cons.topic, cons.group, err.Error())
		return
	}

	spin := shared.NewSpinner(shared.SpinPriorityLow)
	for !cons.consumerGroupClient.Closed() {
		cons.WaitOnFuse()
		select {
		case event := <-cons.consumerGroupClient.Messages():
			cons.processMessage(event)
			cons.consumerGroupClient.CommitUpto(event)
		case err := <-cons.consumerGroupClient.Errors():
			restartWithNewClient = true
			Log.Error.Print("Kafka consumer error: ", err)
			return
		default:
			spin.Yield()
		}
	}
}

I saw that there are multiple issues related to Sarama as well when it comes to connection issues. So, I wanted to post it here to get a bit of confirmation that this consumer is good before creating issue in sarama directly.

@wvanbergen if you have any ideas on this, I would be happy to create a PR to resolve this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions