Skip to content

Set connection: close header on shutdown #128025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128025.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128025
summary: "Set `connection: close` header on shutdown"
area: Network
type: enhancement
issues:
- 127984
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,26 @@

package org.elasticsearch.http.netty4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCounted;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.node.NodeClient;
Expand All @@ -29,6 +44,8 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpServerTransport;
Expand All @@ -41,29 +58,42 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.EmptyResponseListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.netty4.NettyAllocator;
import org.elasticsearch.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.oneOf;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class Netty4PipeliningIT extends ESNetty4IntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.concatLists(List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class), super.nodePlugins());
return CollectionUtils.concatLists(
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
super.nodePlugins()
);
}

private static final int MAX_PIPELINE_EVENTS = 10;
Expand Down Expand Up @@ -142,6 +172,115 @@ private void runPipeliningTest(int expectedResponseCount, String... routes) thro
}
}

public void testSetCloseConnectionHeaderWhenShuttingDown() throws IOException {

// This test works using KeepPipeliningPlugin to keep a HTTP connection from becoming idle with a sequence of requests while the
// node shuts down and ensures that these requests start to receive responses with `Connection: close` and that the node does not
// shut down until all requests have received a response.

final var victimNode = internalCluster().startNode();

final var releasables = new ArrayList<Releasable>(3);
try {
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
releasables.add(keepPipeliningRequest::release);

final var enoughResponsesToCloseLatch = new CountDownLatch(between(1, 5));
final var outstandingRequestsCounter = new AtomicInteger();
final var nodeShuttingDown = new AtomicBoolean();
final var stoppedPipelining = new AtomicBoolean();

final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
.group(eventLoopGroup)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {

private int closeHeadersToIgnore = between(0, 5);

private boolean ignoreCloseHeader() {
if (closeHeadersToIgnore == 0) {
return false;
} else {
closeHeadersToIgnore -= 1;
return true;
}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
enoughResponsesToCloseLatch.countDown();
assertThat(
outstandingRequestsCounter.decrementAndGet(),
stoppedPipelining.get() ? oneOf(0, 1) : equalTo(1)
);

if ("close".equals(msg.headers().get("connection")) && ignoreCloseHeader() == false) {
assertTrue(nodeShuttingDown.get());
// send one more request with `?respond_immediately` to stop the pipelining
if (stoppedPipelining.compareAndSet(false, true)) {
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
ctx.writeAndFlush(
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
KeepPipeliningPlugin.ROUTE + "?" + KeepPipeliningPlugin.RESPOND_IMMEDIATELY
)
);
}
} else {
// still pipelining, send another request to trigger the next response
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
ctx.writeAndFlush(keepPipeliningRequest.retain());
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
}
});
}
});

final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, victimNode);
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();

// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());

// Send two pipelined requests so that we start to receive responses
assertTrue(outstandingRequestsCounter.compareAndSet(0, 2));
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());

// wait until we've started to receive responses
safeAwait(enoughResponsesToCloseLatch);

// Shut down the node
assertTrue(nodeShuttingDown.compareAndSet(false, true));
internalCluster().stopNode(victimNode);

// Wait for the pipelining channel to be closed, indicating that it stopped pipelining (because it received a response with
// `Connection: close`) and allowed the node to shut down
pipeliningChannel.closeFuture().syncUninterruptibly();

// The shutdown did not happen until all requests had had a response.
assertTrue(stoppedPipelining.get());
assertEquals(0, outstandingRequestsCounter.get());

} finally {
Collections.reverse(releasables);
Releasables.close(releasables);
}
}

private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
// check if opaque ids are monotonically increasing
int i = 0;
Expand Down Expand Up @@ -203,7 +342,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

/**
* Adds an HTTP route that waits for 3 concurrent executions before returning any of them
* Adds an HTTP route that starts to emit a chunked response and then fails before its completion.
*/
public static class ChunkAndFailPlugin extends Plugin implements ActionPlugin {

Expand Down Expand Up @@ -285,4 +424,54 @@ public String getResponseContentTypeString() {
});
}
}

/**
* Adds an HTTP route that only responds when starting to process a second request, ensuring that there is always at least one in-flight
* request in the pipeline which keeps a connection from becoming idle.
*/
public static class KeepPipeliningPlugin extends Plugin implements ActionPlugin {

static final String ROUTE = "/_test/keep_pipelining";
static final String RESPOND_IMMEDIATELY = "respond_immediately";

@Override
public Collection<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new BaseRestHandler() {

private SubscribableListener<Void> lastRequestTrigger = new SubscribableListener<>();

@Override
public String getName() {
return ROUTE;
}

@Override
public List<Route> routes() {
return List.of(new Route(GET, ROUTE));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final var respondImmediately = request.paramAsBoolean(RESPOND_IMMEDIATELY, false);
return channel -> {
// all happens on a single thread in these tests, no need for concurrency protection
final var previousRequestTrigger = lastRequestTrigger;
lastRequestTrigger = respondImmediately ? SubscribableListener.nullSuccess() : new SubscribableListener<>();
lastRequestTrigger.addListener(new EmptyResponseListener(channel).map(ignored -> ActionResponse.Empty.INSTANCE));
previousRequestTrigger.onResponse(null);
};
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.AbstractRefCounted;
Expand Down Expand Up @@ -615,13 +616,27 @@ public ThreadPool getThreadPool() {
}

/**
* A {@link HttpChannel} that tracks number of requests via a {@link RefCounted}.
* A {@link HttpChannel} that tracks the number of in-flight requests via a {@link RefCounted}, allowing the channel to be put into a
* state where it will close when idle.
*/
private static class RequestTrackingHttpChannel implements HttpChannel {

/**
* Action which closes the inner channel exactly once, to avoid a double-close due to a natural {@link #close()} happening
* concurrently with the release of the last reference.
*/
private final Runnable closeOnce = new RunOnce(this::closeInner);

/**
* Whether the channel will close when it becomes idle (i.e. the node is shutting down).
*/
private volatile boolean closeWhenIdle;

/**
* Only counts down to zero via {@link #setCloseWhenIdle()}.
*/
final RefCounted refCounted = AbstractRefCounted.of(this::closeInner);
final RefCounted refCounted = AbstractRefCounted.of(closeOnce);

final HttpChannel inner;

RequestTrackingHttpChannel(HttpChannel inner) {
Expand All @@ -636,24 +651,21 @@ public void incomingRequest() throws IllegalStateException {
* Close the channel when there are no more requests in flight.
*/
public void setCloseWhenIdle() {
assert closeWhenIdle == false : "setCloseWhenIdle() already called";
closeWhenIdle = true;
refCounted.decRef();
}

@Override
public void close() {
closeInner();
closeOnce.run();
}

/**
* Synchronized to avoid double close due to a natural close and a close via {@link #setCloseWhenIdle()}
*/
private void closeInner() {
synchronized (inner) {
if (inner.isOpen()) {
inner.close();
} else {
logger.info("channel [{}] already closed", inner);
}
if (inner.isOpen()) {
inner.close();
} else {
logger.info("channel [{}] already closed", inner);
}
}

Expand All @@ -669,6 +681,12 @@ public boolean isOpen() {

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
assert response.containsHeader(DefaultRestChannel.CONNECTION) == false;
if (closeWhenIdle) {
// We are shutting down, but will keep the connection open while there are still in-flight requests, and this could be an
// arbitrarily long wait if the client is pipelining, so tell the client it should stop using this connection:
response.addHeader(DefaultRestChannel.CONNECTION, DefaultRestChannel.CLOSE);
}
inner.sendResponse(
response,
listener != null ? ActionListener.runAfter(listener, refCounted::decRef) : ActionListener.running(refCounted::decRef)
Expand Down
Loading