Skip to content

transport: pass network channel exceptions to close listeners #127895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportLogger;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@ESIntegTestCase.ClusterScope(numDataNodes = 2, scope = ESIntegTestCase.Scope.TEST)
public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
Expand Down Expand Up @@ -96,7 +103,7 @@ public void testConnectionLogging() throws IOException {
"close connection log",
TcpTransport.class.getCanonicalName(),
Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*"
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\]$"
)
);

Expand All @@ -105,4 +112,46 @@ public void testConnectionLogging() throws IOException {

mockLog.assertAllExpectationsMatched();
}

@TestLogging(
value = "org.elasticsearch.transport.TcpTransport:DEBUG",
reason = "to ensure we log exception disconnect events on DEBUG level"
)
public void testExceptionalDisconnectLogging() throws Exception {
mockLog.addExpectation(
new MockLog.PatternSeenEventExpectation(
"exceptional close connection log",
TcpTransport.class.getCanonicalName(),
Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*"
)
);

final String nodeName = internalCluster().startNode();

final CountDownLatch latch = new CountDownLatch(1);
String masterNode = internalCluster().getMasterName();
ConnectionManager connManager = internalCluster().getInstance(TransportService.class, masterNode).getConnectionManager();
connManager.addListener(new TransportConnectionListener() {
@Override
public void onConnectionClosed(Transport.Connection conn) {
conn.addCloseListener(new ActionListener<>() {
@Override
public void onResponse(Void ignored) {}

@Override
public void onFailure(Exception e) {
latch.countDown();
}
});
}
});

int failAttempts = 0;
do {
internalCluster().restartNode(nodeName);
} while (latch.await(500, TimeUnit.MILLISECONDS) == false && failAttempts++ < 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you encounter cases where we don't get an exceptional close on the first try?


mockLog.assertAllExpectationsMatched();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,33 @@ public class Netty4TcpChannel implements TcpChannel {
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;
/**
* Exception causing a close, reported to the {@link #closeContext} listener
*/
private volatile Exception closeException = null;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new ListenableFuture<>();
this.rstOnClose = rstOnClose;
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
addListener(this.channel.closeFuture(), new ActionListener<>() {
@Override
public void onResponse(Void ignored) {
if (closeException != null) {
closeContext.onFailure(closeException);
} else {
closeContext.onResponse(null);
}
}

@Override
public void onFailure(Exception e) {
assert false : new AssertionError("netty channel closeFuture should never report a failure");
}
});
}

@Override
Expand Down Expand Up @@ -95,6 +113,11 @@ public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(listener);
}

@Override
public void setCloseException(Exception e) {
closeException = e;
}

@Override
public ChannelStats getChannelStats() {
return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Map;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;

Expand Down Expand Up @@ -308,18 +307,27 @@ protected void stopInternal() {
}, serverBootstraps::clear, () -> clientBootstrap = null);
}

static Exception exceptionFromThrowable(Throwable cause) {
if (cause instanceof Error) {
return new Exception(cause);
} else {
return (Exception) cause;
}
}

protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
setupPipeline(ch, false);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
channel.setCloseException(exceptionFromThrowable(cause));
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand All @@ -337,7 +345,6 @@ protected ServerChannelInitializer(String name) {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
Expand All @@ -348,6 +355,8 @@ protected void initChannel(Channel ch) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
channel.setCloseException(exceptionFromThrowable(cause));
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand Down Expand Up @@ -383,26 +392,14 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
);
}

private static void addClosedExceptionLogger(Channel channel) {
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
}
});
}

@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));
} else {
onServerException(serverChannel, (Exception) cause);
}
onServerException(serverChannel, exceptionFromThrowable(cause));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* When the close completes but an exception prompted the closure, the exception will be passed to the
* listener's onFailure method.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<Void> listener);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
return curr;
});
if (tracker.registered.compareAndSet(false, true)) {
channel.addCloseListener(ActionListener.wrap(r -> {
channel.addCloseListener(ActionListener.running(() -> {
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
assert removedTracker == tracker;
onChannelClosed(tracker);
}, e -> { assert false : new AssertionError("must not be here", e); }));
}));
}
return tracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public void close() {
}
}

public void closeAndFail(Exception e) {
if (closed.compareAndSet(false, true)) {
closeContext.onFailure(e);
}
}

@Override
public void onRemoved() {
if (removed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
e
);
channel.setCloseException(e);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void sendResponse(
),
ex
);
channel.setCloseException(ex);
channel.close();
} else {
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
Expand Down Expand Up @@ -204,6 +205,7 @@ void sendErrorResponse(
} catch (Exception sendException) {
sendException.addSuppressed(error);
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
channel.setCloseException(sendException);
channel.close();
}
}
Expand Down Expand Up @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
}
});
} catch (RuntimeException ex) {
channel.setCloseException(ex);
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
*/
void addConnectListener(ActionListener<Void> listener);

/**
* Report a close-causing exception on this channel
*
* @param e the exception
*/
void setCloseException(Exception e);

/**
* Returns stats about this channel
*/
Expand Down
39 changes: 36 additions & 3 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) {

@Override
public void close() {
handleClose(null);
}

@Override
public void closeAndFail(Exception e) {
handleClose(e);
}

private void handleClose(Exception e) {
if (isClosing.compareAndSet(false, true)) {
try {
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
CloseableChannel.closeChannels(channels, block);
} finally {
// Call the super method to trigger listeners
super.close();
if (e == null) {
super.close();
} else {
super.closeAndFail(e);
}
}
}
}
Expand Down Expand Up @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
}
} finally {
if (closeChannel) {
channel.setCloseException(e);
CloseableChannel.closeChannel(channel);
}
}
Expand Down Expand Up @@ -1120,7 +1134,17 @@ public void onResponse(Void v) {
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.running(nodeChannels::close));
ch.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
nodeChannels.close();
}

@Override
public void onFailure(Exception e) {
nodeChannels.closeAndFail(e);
}
});
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
Expand Down Expand Up @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) {

@Override
public void onFailure(Exception e) {
assert false : e; // never called
long closeTimeMillis = threadPool.relativeTimeInMillis();
logger.debug(
() -> format(
"closed transport connection [%d] to [%s] with age [%dms], exception:",
connectionId,
node,
closeTimeMillis - openTimeMillis
),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp
TransportException;

/**
* The listener's {@link ActionListener#onResponse(Object)} method will be called when this
* connection is closed. No implementations currently throw an exception during close, so
* {@link ActionListener#onFailure(Exception)} will not be called.
* The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method
* will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called
* when the connection has successfully closed, but an exception has prompted the close.
*
* @param listener to be called
*/
Expand Down
Loading