Skip to content

Commit 0ff75db

Browse files
committed
Set connection: close header on shutdown
Lets clients using HTTP pipelining know to cease usage of connections to shutting-down nodes. Closes #127984
1 parent 41613e6 commit 0ff75db

File tree

2 files changed

+221
-14
lines changed

2 files changed

+221
-14
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

+191-2
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,26 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12+
import io.netty.bootstrap.Bootstrap;
13+
import io.netty.channel.ChannelHandlerContext;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.ChannelOption;
16+
import io.netty.channel.EventLoopGroup;
17+
import io.netty.channel.SimpleChannelInboundHandler;
18+
import io.netty.channel.nio.NioEventLoopGroup;
19+
import io.netty.channel.socket.SocketChannel;
20+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
21+
import io.netty.handler.codec.http.HttpClientCodec;
22+
import io.netty.handler.codec.http.HttpMethod;
23+
import io.netty.handler.codec.http.HttpResponse;
24+
import io.netty.handler.codec.http.HttpVersion;
1225
import io.netty.util.ReferenceCounted;
1326

1427
import org.apache.lucene.util.BytesRef;
1528
import org.elasticsearch.ESNetty4IntegTestCase;
29+
import org.elasticsearch.ExceptionsHelper;
1630
import org.elasticsearch.action.ActionListener;
31+
import org.elasticsearch.action.ActionResponse;
1732
import org.elasticsearch.action.support.CountDownActionListener;
1833
import org.elasticsearch.action.support.SubscribableListener;
1934
import org.elasticsearch.client.internal.node.NodeClient;
@@ -29,6 +44,8 @@
2944
import org.elasticsearch.common.settings.SettingsFilter;
3045
import org.elasticsearch.common.unit.ByteSizeUnit;
3146
import org.elasticsearch.common.util.CollectionUtils;
47+
import org.elasticsearch.core.Releasable;
48+
import org.elasticsearch.core.Releasables;
3249
import org.elasticsearch.core.Strings;
3350
import org.elasticsearch.features.NodeFeature;
3451
import org.elasticsearch.http.HttpServerTransport;
@@ -41,29 +58,42 @@
4158
import org.elasticsearch.rest.RestRequest;
4259
import org.elasticsearch.rest.RestResponse;
4360
import org.elasticsearch.rest.RestStatus;
61+
import org.elasticsearch.rest.action.EmptyResponseListener;
4462
import org.elasticsearch.rest.action.RestToXContentListener;
4563
import org.elasticsearch.test.ESIntegTestCase;
64+
import org.elasticsearch.transport.netty4.NettyAllocator;
4665
import org.elasticsearch.xcontent.ToXContentObject;
4766

4867
import java.io.IOException;
68+
import java.util.ArrayList;
4969
import java.util.Arrays;
5070
import java.util.Collection;
71+
import java.util.Collections;
5172
import java.util.List;
73+
import java.util.concurrent.CountDownLatch;
74+
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.atomic.AtomicBoolean;
76+
import java.util.concurrent.atomic.AtomicInteger;
5277
import java.util.function.Predicate;
5378
import java.util.function.Supplier;
5479

5580
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
5681
import static org.elasticsearch.rest.RestRequest.Method.GET;
82+
import static org.hamcrest.Matchers.equalTo;
5783
import static org.hamcrest.Matchers.hasSize;
5884
import static org.hamcrest.Matchers.is;
5985
import static org.hamcrest.Matchers.lessThanOrEqualTo;
86+
import static org.hamcrest.Matchers.oneOf;
6087

6188
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
6289
public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
6390

6491
@Override
6592
protected Collection<Class<? extends Plugin>> nodePlugins() {
66-
return CollectionUtils.concatLists(List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class), super.nodePlugins());
93+
return CollectionUtils.concatLists(
94+
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
95+
super.nodePlugins()
96+
);
6797
}
6898

6999
private static final int MAX_PIPELINE_EVENTS = 10;
@@ -142,6 +172,115 @@ private void runPipeliningTest(int expectedResponseCount, String... routes) thro
142172
}
143173
}
144174

175+
public void testSetCloseConnectionHeaderWhenShuttingDown() throws IOException {
176+
177+
// This test works using KeepPipeliningPlugin to keep a HTTP connection from becoming idle with a sequence of requests while the
178+
// node shuts down and ensures that these requests start to receive responses with `Connection: close` and that the node does not
179+
// shut down until all requests have received a response.
180+
181+
final var victimNode = internalCluster().startNode();
182+
183+
final var releasables = new ArrayList<Releasable>(3);
184+
try {
185+
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
186+
releasables.add(keepPipeliningRequest::release);
187+
188+
final var enoughResponsesToCloseLatch = new CountDownLatch(between(1, 5));
189+
final var outstandingRequestsCounter = new AtomicInteger();
190+
final var nodeShuttingDown = new AtomicBoolean();
191+
final var stoppedPipelining = new AtomicBoolean();
192+
193+
final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
194+
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
195+
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
196+
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
197+
.group(eventLoopGroup)
198+
.handler(new ChannelInitializer<SocketChannel>() {
199+
@Override
200+
protected void initChannel(SocketChannel ch) {
201+
ch.pipeline().addLast(new HttpClientCodec());
202+
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {
203+
204+
private int closeHeadersToIgnore = between(0, 5);
205+
206+
private boolean ignoreCloseHeader() {
207+
if (closeHeadersToIgnore == 0) {
208+
return false;
209+
} else {
210+
closeHeadersToIgnore -= 1;
211+
return true;
212+
}
213+
}
214+
215+
@Override
216+
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
217+
enoughResponsesToCloseLatch.countDown();
218+
assertThat(
219+
outstandingRequestsCounter.decrementAndGet(),
220+
stoppedPipelining.get() ? oneOf(0, 1) : equalTo(1)
221+
);
222+
223+
if ("close".equals(msg.headers().get("connection")) && ignoreCloseHeader() == false) {
224+
assertTrue(nodeShuttingDown.get());
225+
// send one more request with `?respond_immediately` to stop the pipelining
226+
if (stoppedPipelining.compareAndSet(false, true)) {
227+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
228+
ctx.writeAndFlush(
229+
new DefaultFullHttpRequest(
230+
HttpVersion.HTTP_1_1,
231+
HttpMethod.GET,
232+
KeepPipeliningPlugin.ROUTE + "?" + KeepPipeliningPlugin.RESPOND_IMMEDIATELY
233+
)
234+
);
235+
}
236+
} else {
237+
// still pipelining, send another request to trigger the next response
238+
assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2));
239+
ctx.writeAndFlush(keepPipeliningRequest.retain());
240+
}
241+
}
242+
243+
@Override
244+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
245+
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
246+
}
247+
});
248+
}
249+
});
250+
251+
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, victimNode);
252+
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();
253+
254+
// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
255+
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
256+
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());
257+
258+
// Send two pipelined requests so that we start to receive responses
259+
assertTrue(outstandingRequestsCounter.compareAndSet(0, 2));
260+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
261+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
262+
263+
// wait until we've started to receive responses
264+
safeAwait(enoughResponsesToCloseLatch);
265+
266+
// Shut down the node
267+
assertTrue(nodeShuttingDown.compareAndSet(false, true));
268+
internalCluster().stopNode(victimNode);
269+
270+
// Wait for the pipelining channel to be closed, indicating that it stopped pipelining (because it received a response with
271+
// `Connection: close`) and allowed the node to shut down
272+
pipeliningChannel.closeFuture().syncUninterruptibly();
273+
274+
// The shutdown did not happen until all requests had had a response.
275+
assertTrue(stoppedPipelining.get());
276+
assertEquals(0, outstandingRequestsCounter.get());
277+
278+
} finally {
279+
Collections.reverse(releasables);
280+
Releasables.close(releasables);
281+
}
282+
}
283+
145284
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
146285
// check if opaque ids are monotonically increasing
147286
int i = 0;
@@ -203,7 +342,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
203342
}
204343

205344
/**
206-
* Adds an HTTP route that waits for 3 concurrent executions before returning any of them
345+
* Adds an HTTP route that starts to emit a chunked response and then fails before its completion.
207346
*/
208347
public static class ChunkAndFailPlugin extends Plugin implements ActionPlugin {
209348

@@ -285,4 +424,54 @@ public String getResponseContentTypeString() {
285424
});
286425
}
287426
}
427+
428+
/**
429+
* Adds an HTTP route that only responds when starting to process a second request, ensuring that there is always at least one in-flight
430+
* request in the pipeline which keeps a connection from becoming idle.
431+
*/
432+
public static class KeepPipeliningPlugin extends Plugin implements ActionPlugin {
433+
434+
static final String ROUTE = "/_test/keep_pipelining";
435+
static final String RESPOND_IMMEDIATELY = "respond_immediately";
436+
437+
@Override
438+
public Collection<RestHandler> getRestHandlers(
439+
Settings settings,
440+
NamedWriteableRegistry namedWriteableRegistry,
441+
RestController restController,
442+
ClusterSettings clusterSettings,
443+
IndexScopedSettings indexScopedSettings,
444+
SettingsFilter settingsFilter,
445+
IndexNameExpressionResolver indexNameExpressionResolver,
446+
Supplier<DiscoveryNodes> nodesInCluster,
447+
Predicate<NodeFeature> clusterSupportsFeature
448+
) {
449+
return List.of(new BaseRestHandler() {
450+
451+
private SubscribableListener<Void> lastRequestTrigger = new SubscribableListener<>();
452+
453+
@Override
454+
public String getName() {
455+
return ROUTE;
456+
}
457+
458+
@Override
459+
public List<Route> routes() {
460+
return List.of(new Route(GET, ROUTE));
461+
}
462+
463+
@Override
464+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
465+
final var respondImmediately = request.paramAsBoolean(RESPOND_IMMEDIATELY, false);
466+
return channel -> {
467+
// all happens on a single thread in these tests, no need for concurrency protection
468+
final var previousRequestTrigger = lastRequestTrigger;
469+
lastRequestTrigger = respondImmediately ? SubscribableListener.nullSuccess() : new SubscribableListener<>();
470+
lastRequestTrigger.addListener(new EmptyResponseListener(channel).map(ignored -> ActionResponse.Empty.INSTANCE));
471+
previousRequestTrigger.onResponse(null);
472+
};
473+
}
474+
});
475+
}
476+
}
288477
}

server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

+30-12
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.unit.ByteSizeValue;
3333
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3434
import org.elasticsearch.common.util.concurrent.FutureUtils;
35+
import org.elasticsearch.common.util.concurrent.RunOnce;
3536
import org.elasticsearch.common.util.concurrent.ThreadContext;
3637
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
3738
import org.elasticsearch.core.AbstractRefCounted;
@@ -615,13 +616,27 @@ public ThreadPool getThreadPool() {
615616
}
616617

617618
/**
618-
* A {@link HttpChannel} that tracks number of requests via a {@link RefCounted}.
619+
* A {@link HttpChannel} that tracks the number of in-flight requests via a {@link RefCounted}, allowing the channel to be put into a
620+
* state where it will close when idle.
619621
*/
620622
private static class RequestTrackingHttpChannel implements HttpChannel {
623+
624+
/**
625+
* Action which closes the inner channel exactly once, to avoid a double-close due to a natural {@link #close()} happening
626+
* concurrently with the release of the last reference.
627+
*/
628+
private final Runnable closeOnce = new RunOnce(this::closeInner);
629+
630+
/**
631+
* Whether the channel will close when it becomes idle (i.e. the node is shutting down).
632+
*/
633+
private volatile boolean closeWhenIdle;
634+
621635
/**
622636
* Only counts down to zero via {@link #setCloseWhenIdle()}.
623637
*/
624-
final RefCounted refCounted = AbstractRefCounted.of(this::closeInner);
638+
final RefCounted refCounted = AbstractRefCounted.of(closeOnce);
639+
625640
final HttpChannel inner;
626641

627642
RequestTrackingHttpChannel(HttpChannel inner) {
@@ -636,24 +651,21 @@ public void incomingRequest() throws IllegalStateException {
636651
* Close the channel when there are no more requests in flight.
637652
*/
638653
public void setCloseWhenIdle() {
654+
assert closeWhenIdle = false : "setCloseWhenIdle() already called";
655+
closeWhenIdle = true;
639656
refCounted.decRef();
640657
}
641658

642659
@Override
643660
public void close() {
644-
closeInner();
661+
closeOnce.run();
645662
}
646663

647-
/**
648-
* Synchronized to avoid double close due to a natural close and a close via {@link #setCloseWhenIdle()}
649-
*/
650664
private void closeInner() {
651-
synchronized (inner) {
652-
if (inner.isOpen()) {
653-
inner.close();
654-
} else {
655-
logger.info("channel [{}] already closed", inner);
656-
}
665+
if (inner.isOpen()) {
666+
inner.close();
667+
} else {
668+
logger.info("channel [{}] already closed", inner);
657669
}
658670
}
659671

@@ -669,6 +681,12 @@ public boolean isOpen() {
669681

670682
@Override
671683
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
684+
assert response.containsHeader(DefaultRestChannel.CONNECTION) == false;
685+
if (closeWhenIdle) {
686+
// We are shutting down, but will keep the connection open while there are still in-flight requests, and this could be an
687+
// arbitrarily long wait if the client is pipelining, so tell the client it should stop using this connection:
688+
response.addHeader(DefaultRestChannel.CONNECTION, DefaultRestChannel.CLOSE);
689+
}
672690
inner.sendResponse(
673691
response,
674692
listener != null ? ActionListener.runAfter(listener, refCounted::decRef) : ActionListener.running(refCounted::decRef)

0 commit comments

Comments
 (0)