Skip to content

Commit 440f014

Browse files
committed
fix(zero): make zero shutdown cleanly
1 parent d28f8fa commit 440f014

File tree

3 files changed

+38
-14
lines changed

3 files changed

+38
-14
lines changed

conn/pool.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool {
9191
return pool
9292
}
9393

94+
// RemoveAll removes all pool entries.
95+
func (p *Pools) RemoveAll() {
96+
p.Lock()
97+
defer p.Unlock()
98+
99+
for k, pool := range p.all {
100+
glog.Warningf("CONN: Disconnecting from %s\n", k)
101+
delete(p.all, k)
102+
pool.shutdown()
103+
}
104+
}
105+
94106
// RemoveInvalid removes invalid nodes from the list of pools.
95107
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
96108
// Keeps track of valid IP addresses, assigned to active nodes. We do this
@@ -241,11 +253,10 @@ func (p *Pool) listenToHeartbeat() error {
241253
}()
242254

243255
threshold := time.Now().Add(10 * time.Second)
244-
ticker := time.NewTicker(time.Second)
245-
defer ticker.Stop()
256+
ticker := time.Tick(time.Second)
246257
for {
247258
select {
248-
case <-ticker.C:
259+
case <-ticker:
249260
// Don't check before at least 10s since start.
250261
if time.Now().Before(threshold) {
251262
continue
@@ -277,11 +288,19 @@ func (p *Pool) MonitorHealth() {
277288

278289
// We might have lost connection to the destination. In that case, re-dial
279290
// the connection.
280-
reconnect := func() {
291+
// Returns true, if reconnection was successful
292+
reconnect := func() bool {
293+
reconnectionTicker := time.Tick(time.Second)
281294
for {
282-
time.Sleep(time.Second)
295+
select {
296+
case <-p.closer.HasBeenClosed():
297+
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
298+
return false
299+
case <-reconnectionTicker:
300+
}
301+
283302
if err := p.closer.Ctx().Err(); err != nil {
284-
return
303+
return false
285304
}
286305
ctx, cancel := context.WithTimeout(p.closer.Ctx(), 10*time.Second)
287306
conn, err := grpc.NewClient(p.Addr, p.dialOpts...)
@@ -298,7 +317,7 @@ func (p *Pool) MonitorHealth() {
298317
}
299318
p.conn = conn
300319
p.Unlock()
301-
return
320+
return true
302321
}
303322
glog.Errorf("CONN: Unable to connect with %s : %s\n", p.Addr, err)
304323
if conn != nil {
@@ -309,19 +328,20 @@ func (p *Pool) MonitorHealth() {
309328
}
310329
}
311330

331+
ticker := time.Tick(time.Second)
312332
for {
313333
select {
314334
case <-p.closer.HasBeenClosed():
315335
glog.Infof("CONN: Returning from MonitorHealth for %s", p.Addr)
316336
return
317-
default:
318-
err := p.listenToHeartbeat()
319-
if err != nil {
320-
reconnect()
337+
case <-ticker:
338+
}
339+
340+
err := p.listenToHeartbeat()
341+
if err != nil {
342+
if reconnect() {
321343
glog.Infof("CONN: Re-established connection with %s.\n", p.Addr)
322344
}
323-
// Sleep for a bit before retrying.
324-
time.Sleep(echoDuration)
325345
}
326346
}
327347
}

dgraph/cmd/zero/raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ func (n *node) Run() {
879879
// snapshot can cause select loop to block while deleting entries, so run
880880
// it in goroutine
881881
readStateCh := make(chan raft.ReadState, 100)
882-
closer := z.NewCloser(5)
882+
closer := z.NewCloser(4)
883883
defer func() {
884884
closer.SignalAndWait()
885885
n.closer.Done()

dgraph/cmd/zero/run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,10 @@ func run() {
346346
st.node.closer.SignalAndWait()
347347
// Stop all internal requests.
348348
_ = grpcListener.Close()
349+
// Stop all pools
350+
if pools := conn.GetPools(); pools != nil {
351+
pools.RemoveAll()
352+
}
349353
}()
350354

351355
st.zero.closer.AddRunning(2)

0 commit comments

Comments
 (0)