Skip to content

Commit e6bf332

Browse files
authored
fix: fix zookeeper collect blocking (#965)
1 parent 4de08ca commit e6bf332

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

inputs/zookeeper/zookeeper.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ func (ins *Instance) Gather(slist *types.SampleList) {
110110

111111
func (ins *Instance) gatherOneHost(wg *sync.WaitGroup, slist *types.SampleList, zkHost string) {
112112
defer wg.Done()
113+
defer func() {
114+
if r := recover(); r != nil {
115+
log.Println("E! Recovered in zookeeper gatherOneHost ", zkHost, r)
116+
}
117+
}()
113118

114119
tags := map[string]string{"zk_host": zkHost, "zk_cluster": ins.ClusterName}
115120
begun := time.Now()
@@ -120,27 +125,22 @@ func (ins *Instance) gatherOneHost(wg *sync.WaitGroup, slist *types.SampleList,
120125
slist.PushFront(types.NewSample("", "zk_scrape_use_seconds", use, tags))
121126
}(begun)
122127

123-
// zk_up
124-
mntrConn, err := ins.ZkConnect(zkHost)
125-
if err != nil {
126-
slist.PushFront(types.NewSample("", "zk_up", 0, tags))
127-
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
128-
return
129-
}
130-
131-
defer mntrConn.Close()
132-
ins.gatherMntrResult(mntrConn, slist, tags)
128+
// zk_up zk_ruok
129+
conn, err := ins.ZkConnect(zkHost)
133130

134-
// zk_ruok
135-
ruokConn, err := ins.ZkConnect(zkHost)
136131
if err != nil {
132+
slist.PushFront(types.NewSample("", "zk_up", 0, tags))
137133
slist.PushFront(types.NewSample("", "zk_ruok", 0, tags))
138134
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
139135
return
140136
}
137+
defer conn.Close()
138+
139+
// prevent blocking
140+
conn.SetDeadline(time.Now().Add(time.Duration(ins.Timeout) * time.Second))
141141

142-
defer ruokConn.Close()
143-
ins.gatherRuokResult(ruokConn, slist, tags)
142+
ins.gatherMntrResult(conn, slist, tags)
143+
ins.gatherRuokResult(conn, slist, tags)
144144
}
145145

146146
func (ins *Instance) gatherMntrResult(conn net.Conn, slist *types.SampleList, globalTags map[string]string) {
@@ -226,12 +226,14 @@ func (ins *Instance) gatherRuokResult(conn net.Conn, slist *types.SampleList, gl
226226
func sendZookeeperCmd(conn net.Conn, cmd string) string {
227227
_, err := conn.Write([]byte(cmd))
228228
if err != nil {
229-
log.Println("E! failed to exec Zookeeper command:", cmd)
229+
log.Printf("E! failed to exec Zookeeper command: %s response from '%s': %s", cmd, conn.RemoteAddr().String(), err)
230+
return ""
230231
}
231232

232233
res, err := ioutil.ReadAll(conn)
233234
if err != nil {
234235
log.Printf("E! failed read Zookeeper command: '%s' response from '%s': %s", cmd, conn.RemoteAddr().String(), err)
236+
return ""
235237
}
236238
return string(res)
237239
}

0 commit comments

Comments
 (0)