Skip to content

Commit deb9ee4

Browse files
committed
1 parent 5172a0d commit deb9ee4

File tree

1 file changed

+49
-2
lines changed

1 file changed

+49
-2
lines changed

src/java/org/apache/cassandra/gms/Gossiper.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.google.common.collect.Iterables;
5656
import com.google.common.collect.Sets;
5757
import com.google.common.util.concurrent.Uninterruptibles;
58+
import org.apache.cassandra.exceptions.RequestFailureReason;
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061

@@ -165,6 +166,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
165166
@VisibleForTesting
166167
final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>();
167168

169+
/* Inflight echo requests. */
170+
private final Set<InetAddressAndPort> inflightEcho = new ConcurrentSkipListSet<>();
171+
168172
/* unreachable member set */
169173
private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>();
170174

@@ -729,6 +733,7 @@ public void removeEndpoint(InetAddressAndPort endpoint)
729733
return;
730734

731735
liveEndpoints.remove(endpoint);
736+
inflightEcho.remove(endpoint);
732737
unreachableEndpoints.remove(endpoint);
733738
MessagingService.instance().versions.reset(endpoint);
734739
quarantineEndpoint(endpoint);
@@ -1430,12 +1435,53 @@ void notifyFailureDetector(InetAddressAndPort endpoint, EndpointState remoteEndp
14301435
private void markAlive(final InetAddressAndPort addr, final EndpointState localState)
14311436
{
14321437
localState.markDead();
1438+
if (!inflightEcho.add(addr))
1439+
{
1440+
return;
1441+
}
14331442

14341443
Message<NoPayload> echoMessage = Message.out(ECHO_REQ, noPayload);
14351444
logger.trace("Sending ECHO_REQ to {}", addr);
1436-
RequestCallback echoHandler = msg ->
1445+
RequestCallback echoHandler = new RequestCallback()
14371446
{
1438-
runInGossipStageBlocking(() -> realMarkAlive(addr, localState));
1447+
@Override
1448+
public void onResponse(Message msg)
1449+
{
1450+
// force processing of the echo response onto the gossip stage, as it comes in on the REQUEST_RESPONSE stage
1451+
runInGossipStageBlocking(() -> {
1452+
try
1453+
{
1454+
EndpointState localStatePtr = endpointStateMap.get(addr);
1455+
realMarkAlive(addr, localStatePtr);
1456+
}
1457+
finally
1458+
{
1459+
inflightEcho.remove(addr);
1460+
}
1461+
});
1462+
}
1463+
1464+
@Override
1465+
public boolean invokeOnFailure()
1466+
{
1467+
return true;
1468+
}
1469+
1470+
@Override
1471+
public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
1472+
{
1473+
if (isEnabled())
1474+
{
1475+
logger.trace("Resending ECHO_REQ to {}", addr);
1476+
Message<NoPayload> echoMessage = Message.out(ECHO_REQ, noPayload);
1477+
MessagingService.instance().sendWithCallback(echoMessage, addr, this);
1478+
}
1479+
else
1480+
{
1481+
logger.trace("Failed ECHO_REQ to {}, aborting due to disabled gossip", addr);
1482+
inflightEcho.remove(addr);
1483+
}
1484+
}
14391485
};
14401486

14411487
MessagingService.instance().sendWithCallback(echoMessage, addr, echoHandler);
@@ -1488,6 +1534,7 @@ public void markDead(InetAddressAndPort addr, EndpointState localState)
14881534
private void silentlyMarkDead(InetAddressAndPort addr, EndpointState localState)
14891535
{
14901536
localState.markDead();
1537+
inflightEcho.remove(addr);
14911538
if (!disableEndpointRemoval)
14921539
{
14931540
liveEndpoints.remove(addr);

0 commit comments

Comments
 (0)