@@ -52,7 +52,8 @@ public class GossipManager {
5252 private long executeGossipTime = 500 ;
5353 private boolean isWorking = false ;
5454 private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock ();
55- private ScheduledExecutorService doGossipExecotor = Executors .newScheduledThreadPool (1 );
55+ private ScheduledExecutorService doGossipExecutor = Executors .newScheduledThreadPool (1 );
56+ private ScheduledExecutorService clearExecutor = Executors .newSingleThreadScheduledExecutor ();
5657
5758 private Map <GossipMember , HeartbeatState > endpointMembers = new ConcurrentHashMap <>();
5859 private List <GossipMember > liveMembers = new ArrayList <>();
@@ -91,7 +92,7 @@ protected void start() {
9192 ));
9293 isWorking = true ;
9394 settings .getMsgService ().listen (getSelf ().getIpAddress (), getSelf ().getPort ());
94- doGossipExecotor .scheduleAtFixedRate (new GossipTask (), settings .getGossipInterval (), settings .getGossipInterval (), TimeUnit .MILLISECONDS );
95+ doGossipExecutor .scheduleAtFixedRate (new GossipTask (), settings .getGossipInterval (), settings .getGossipInterval (), TimeUnit .MILLISECONDS );
9596 }
9697
9798 public List <GossipMember > getLiveMembers () {
@@ -406,6 +407,11 @@ public void down(GossipMember member) {
406407 if (!deadMembers .contains (member )) {
407408 deadMembers .add (member );
408409 }
410+ clearExecutor .schedule (() -> {
411+ if (deadMembers .contains (member )) {
412+ deadMembers .remove (member );
413+ }
414+ }, getSettings ().getDeleteThreshold () * getSettings ().getGossipInterval (), TimeUnit .MILLISECONDS );
409415 fireGossipEvent (member , GossipState .DOWN );
410416 } finally {
411417 rwlock .writeLock ().unlock ();
@@ -420,7 +426,7 @@ private void up(GossipMember member) {
420426 if (!liveMembers .contains (member )) {
421427 liveMembers .add (member );
422428 }
423- if (candidateMembers .containsKey (member )){
429+ if (candidateMembers .containsKey (member )) {
424430 candidateMembers .remove (member );
425431 }
426432 if (deadMembers .contains (member )) {
@@ -433,24 +439,24 @@ private void up(GossipMember member) {
433439
434440 }
435441
436- private void downing (GossipMember member , HeartbeatState state ){
442+ private void downing (GossipMember member , HeartbeatState state ) {
437443 LOGGER .info ("downing ~~" );
438- if (candidateMembers .containsKey (member )){
444+ if (candidateMembers .containsKey (member )) {
439445 CandidateMemberState cState = candidateMembers .get (member );
440- if (state .getHeartbeatTime () == cState .getHeartbeatTime ()){
446+ if (state .getHeartbeatTime () == cState .getHeartbeatTime ()) {
441447 cState .updateCount ();
442- }else if (state .getHeartbeatTime () > cState .getHeartbeatTime ()){
448+ } else if (state .getHeartbeatTime () > cState .getHeartbeatTime ()) {
443449 candidateMembers .remove (member );
444450 }
445- }else {
451+ } else {
446452 candidateMembers .put (member , new CandidateMemberState (state .getHeartbeatTime ()));
447453 }
448454 }
449455
450- private void checkCandidate (){
456+ private void checkCandidate () {
451457 Set <GossipMember > keys = candidateMembers .keySet ();
452- for (GossipMember m : keys ){
453- if (candidateMembers .get (m ).getDowningCount ().get () >= getSettings ().getDeleteThreshold ()){
458+ for (GossipMember m : keys ) {
459+ if (candidateMembers .get (m ).getDowningCount ().get () >= getSettings ().getDeleteThreshold ()) {
454460 down (m );
455461 candidateMembers .remove (m );
456462 }
@@ -460,7 +466,7 @@ private void checkCandidate(){
460466
461467 protected void shutdown () {
462468 getSettings ().getMsgService ().unListen ();
463- doGossipExecotor .shutdown ();
469+ doGossipExecutor .shutdown ();
464470 try {
465471 Thread .sleep (getSettings ().getGossipInterval ());
466472 } catch (InterruptedException e ) {
0 commit comments