@@ -91,6 +91,18 @@ func (p *Pools) GetAll() []*Pool {
9191 return pool
9292}
9393
94+ // RemoveAll removes all pool entries.
95+ func (p * Pools ) RemoveAll () {
96+ p .Lock ()
97+ defer p .Unlock ()
98+
99+ for k , pool := range p .all {
100+ glog .Warningf ("CONN: Disconnecting from %s\n " , k )
101+ delete (p .all , k )
102+ pool .shutdown ()
103+ }
104+ }
105+
94106// RemoveInvalid removes invalid nodes from the list of pools.
95107func (p * Pools ) RemoveInvalid (state * pb.MembershipState ) {
96108 // Keeps track of valid IP addresses, assigned to active nodes. We do this
@@ -241,11 +253,10 @@ func (p *Pool) listenToHeartbeat() error {
241253 }()
242254
243255 threshold := time .Now ().Add (10 * time .Second )
244- ticker := time .NewTicker (time .Second )
245- defer ticker .Stop ()
256+ ticker := time .Tick (time .Second )
246257 for {
247258 select {
248- case <- ticker . C :
259+ case <- ticker :
249260 // Don't check before at least 10s since start.
250261 if time .Now ().Before (threshold ) {
251262 continue
@@ -277,11 +288,19 @@ func (p *Pool) MonitorHealth() {
277288
278289 // We might have lost connection to the destination. In that case, re-dial
279290 // the connection.
280- reconnect := func () {
291+ // Returns true, if reconnection was successful
292+ reconnect := func () bool {
293+ reconnectionTicker := time .Tick (time .Second )
281294 for {
282- time .Sleep (time .Second )
295+ select {
296+ case <- p .closer .HasBeenClosed ():
297+ glog .Infof ("CONN: Returning from MonitorHealth for %s" , p .Addr )
298+ return false
299+ case <- reconnectionTicker :
300+ }
301+
283302 if err := p .closer .Ctx ().Err (); err != nil {
284- return
303+ return false
285304 }
286305 ctx , cancel := context .WithTimeout (p .closer .Ctx (), 10 * time .Second )
287306 conn , err := grpc .NewClient (p .Addr , p .dialOpts ... )
@@ -298,7 +317,7 @@ func (p *Pool) MonitorHealth() {
298317 }
299318 p .conn = conn
300319 p .Unlock ()
301- return
320+ return true
302321 }
303322 glog .Errorf ("CONN: Unable to connect with %s : %s\n " , p .Addr , err )
304323 if conn != nil {
@@ -309,19 +328,20 @@ func (p *Pool) MonitorHealth() {
309328 }
310329 }
311330
331+ ticker := time .Tick (time .Second )
312332 for {
313333 select {
314334 case <- p .closer .HasBeenClosed ():
315335 glog .Infof ("CONN: Returning from MonitorHealth for %s" , p .Addr )
316336 return
317- default :
318- err := p .listenToHeartbeat ()
319- if err != nil {
320- reconnect ()
337+ case <- ticker :
338+ }
339+
340+ err := p .listenToHeartbeat ()
341+ if err != nil {
342+ if reconnect () {
321343 glog .Infof ("CONN: Re-established connection with %s.\n " , p .Addr )
322344 }
323- // Sleep for a bit before retrying.
324- time .Sleep (echoDuration )
325345 }
326346 }
327347}
0 commit comments