Skip to content

Commit 9546e28

Browse files
authored
rate-limiting: propagate back-pressure from queue as HTTP 429's (#179)
Adds a proactive handler that rejects new requests with HTTP 429's when the queue has been blocking for more than 10 consecutive seconds, allowing back- pressure to propagate in advance of filling up the connection backlog queue.
1 parent 4d2aeeb commit 9546e28

File tree

10 files changed

+622
-9
lines changed

10 files changed

+622
-9
lines changed

.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ Gemfile.bak
44
.bundle
55
vendor
66
.idea
7+
.ci
8+
build/*
9+
.ci/*
10+
.gradle/*
11+
lib/logstash-input-http_jars.rb
12+
logstash-input-http.iml

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.10.0
2+
- add improved proactive rate-limiting, rejecting new requests when queue has been actively blocking for more than 10 seconds [#179](https://github.com/logstash-plugins/logstash-input-http/pull/179)
3+
14
## 3.9.2
25
- Upgrade netty to 4.1.115 [#183](https://github.com/logstash-plugins/logstash-input-http/pull/183)
36

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.9.2
1+
3.10.0

spec/inputs/helpers.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,8 @@
33

44
def certificate_path(filename)
55
File.join(CERTS_DIR, filename)
6-
end
6+
end
7+
8+
RSpec.configure do |config|
9+
config.formatter = :documentation
10+
end

spec/inputs/http_spec.rb

+68-6
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,85 @@
5757
let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }
5858

5959
context "when sending more requests than queue slots" do
60-
it "should block when the queue is full" do
60+
it "rejects additional incoming requests with HTTP 429" do
6161
# these will queue and return 200
6262
logstash_queue_size.times.each do |i|
6363
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
6464
expect(response.code).to eq(200)
6565
end
6666

6767
# these will block
68-
(threads + max_pending_requests).times.each do |i|
69-
expect {
70-
client.post("http://127.0.0.1:#{port}", :body => '{}').call
71-
}.to raise_error(Manticore::SocketTimeout)
68+
blocked_calls = (threads + max_pending_requests).times.map do
69+
Thread.new do
70+
begin
71+
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
72+
rescue Manticore::SocketException, Manticore::SocketTimeout => e
73+
{:exception => e}
74+
end
75+
end
76+
end
77+
78+
sleep 1 # let those requests go, but not so long that our block-detector starts emitting 429's
79+
80+
# by now we should be rejecting with 429 since the backlog is full
81+
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
82+
expect(response.code).to eq(429)
83+
84+
# ensure that our blocked connections did block
85+
aggregate_failures do
86+
blocked_calls.map(&:value).each do |blocked|
87+
expect(blocked[:result]).to be_nil
88+
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
89+
end
90+
end
91+
end
92+
end
93+
end
94+
95+
describe "observing queue back-pressure" do
96+
let(:logstash_queue_size) { rand(10) + 1 }
97+
let(:max_pending_requests) { rand(5) + 1 }
98+
let(:threads) { rand(4) + 1 }
99+
let(:logstash_queue) { SizedQueue.new(logstash_queue_size) }
100+
let(:client_options) { {
101+
"request_timeout" => 0.1,
102+
"connect_timeout" => 3,
103+
"socket_timeout" => 0.1
104+
} }
105+
106+
let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }
107+
108+
context "when sending request to an input that has blocked connections" do
109+
it "rejects incoming requests with HTTP 429" do
110+
# these will queue and return 200
111+
logstash_queue_size.times.each do |i|
112+
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
113+
expect(response.code).to eq(200)
72114
end
73115

74-
# by now we should be rejecting with 429
116+
# these will block
117+
blocked_call = Thread.new do
118+
begin
119+
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
120+
rescue Manticore::SocketException, Manticore::SocketTimeout => e
121+
{:exception => e}
122+
end
123+
end
124+
125+
sleep 12 # let that requests go, and ensure it is blocking long enough to be problematic
126+
127+
# by now we should be rejecting with 429 since at least one existing request is blocked
128+
# for more than 10s.
75129
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
76130
expect(response.code).to eq(429)
131+
132+
# ensure that our blocked connections did block
133+
aggregate_failures do
134+
blocked_call.value.tap do |blocked|
135+
expect(blocked[:result]).to be_nil
136+
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
137+
end
138+
end
77139
end
78140
end
79141
end

src/main/java/org/logstash/plugins/inputs/http/HttpInitializer.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import io.netty.handler.codec.http.HttpResponseStatus;
99
import io.netty.handler.codec.http.HttpServerCodec;
1010
import io.netty.handler.ssl.SslHandler;
11+
import org.logstash.plugins.inputs.http.util.ExecutionObserver;
12+
import org.logstash.plugins.inputs.http.util.ExecutionObservingMessageHandler;
13+
import org.logstash.plugins.inputs.http.util.RejectWhenBlockedInboundHandler;
1114
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;
1215

16+
import java.time.Duration;
1317
import java.util.concurrent.ThreadPoolExecutor;
1418

1519
/**
@@ -22,9 +26,11 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
2226
private final HttpResponseStatus responseStatus;
2327
private final ThreadPoolExecutor executorGroup;
2428

29+
private final ExecutionObserver executionObserver = new ExecutionObserver();
30+
2531
public HttpInitializer(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
2632
int maxContentLength, HttpResponseStatus responseStatus) {
27-
this.messageHandler = messageHandler;
33+
this.messageHandler = new ExecutionObservingMessageHandler(executionObserver, messageHandler);
2834
this.executorGroup = executorGroup;
2935
this.maxContentLength = maxContentLength;
3036
this.responseStatus = responseStatus;
@@ -37,7 +43,9 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
3743
SslHandler sslHandler = sslHandlerProvider.getSslHandler(socketChannel.alloc());
3844
pipeline.addLast(sslHandler);
3945
}
46+
4047
pipeline.addLast(new HttpServerCodec());
48+
pipeline.addLast(new RejectWhenBlockedInboundHandler(executionObserver, Duration.ofSeconds(10)));
4149
pipeline.addLast(new HttpContentDecompressor());
4250
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
4351
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.logstash.plugins.inputs.http.util;
2+
3+
import java.time.Duration;
4+
import java.util.Optional;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
import java.util.function.LongSupplier;
7+
8+
/**
9+
* An {@code ExecutionObserver} observes possibly-concurrent execution, and provides information about the
10+
* longest-running observed execution.
11+
*
12+
* <p>
13+
* It is concurrency-safe and non-blocking, and uses plain memory access where practical.
14+
* </p>
15+
*/
16+
public class ExecutionObserver {
17+
private final AtomicReference<Execution> tail; // newest execution
18+
private final AtomicReference<Execution> head; // oldest execution
19+
20+
private final LongSupplier nanosSupplier;
21+
22+
public ExecutionObserver() {
23+
this(System::nanoTime);
24+
}
25+
26+
ExecutionObserver(final LongSupplier nanosSupplier) {
27+
this.nanosSupplier = nanosSupplier;
28+
final Execution anchor = new Execution(nanosSupplier.getAsLong(), true);
29+
this.tail = new AtomicReference<>(anchor);
30+
this.head = new AtomicReference<>(anchor);
31+
}
32+
33+
/**
34+
* @see ExecutionObserver#anyExecuting(Duration)
35+
* @return true if there are any active executions.
36+
*/
37+
public boolean anyExecuting() {
38+
return this.anyExecuting(Duration.ZERO);
39+
}
40+
41+
/**
42+
* @param minimumDuration a threshold to exclude young executions
43+
* @return true if any active execution has been running for at least the provided {@code Duration}
44+
*/
45+
public boolean anyExecuting(final Duration minimumDuration) {
46+
final Execution headExecution = compactHead();
47+
if (headExecution.isComplete) {
48+
return false;
49+
} else {
50+
return nanosSupplier.getAsLong() - headExecution.startNanos >= minimumDuration.toNanos();
51+
}
52+
}
53+
54+
// visible for test
55+
Optional<Duration> longestExecuting() {
56+
final Execution headExecution = compactHead();
57+
if (headExecution.isComplete) {
58+
return Optional.empty();
59+
} else {
60+
return Optional.of(Duration.ofNanos(nanosSupplier.getAsLong() - headExecution.startNanos));
61+
}
62+
}
63+
64+
// test inspections
65+
Stats stats() {
66+
int nodes = 0;
67+
int executing = 0;
68+
69+
Execution candidate = this.head.get();
70+
while (candidate != null) {
71+
nodes += 1;
72+
if (!candidate.isComplete) {
73+
executing += 1;
74+
}
75+
candidate = candidate.next.get();
76+
}
77+
return new Stats(nodes, executing);
78+
}
79+
80+
static class Stats {
81+
final int nodes;
82+
final int executing;
83+
84+
Stats(int nodes, int executing) {
85+
this.nodes = nodes;
86+
this.executing = executing;
87+
}
88+
}
89+
90+
@FunctionalInterface
91+
public interface ExceptionalSupplier<T, E extends Throwable> {
92+
T get() throws E;
93+
}
94+
95+
public <T,E extends Throwable> T observeExecution(final ExceptionalSupplier<T,E> supplier) throws E {
96+
final Execution execution = startExecution();
97+
try {
98+
return supplier.get();
99+
} finally {
100+
final boolean isCompact = execution.markComplete();
101+
if (!isCompact) {
102+
this.compactHead();
103+
}
104+
}
105+
}
106+
107+
@FunctionalInterface
108+
public interface ExceptionalRunnable<E extends Throwable> {
109+
void run() throws E;
110+
}
111+
112+
public <E extends Throwable> void observeExecution(final ExceptionalRunnable<E> runnable) throws E {
113+
observeExecution(() -> { runnable.run(); return null; });
114+
}
115+
116+
// visible for test
117+
Execution startExecution() {
118+
final Execution newTail = new Execution(nanosSupplier.getAsLong());
119+
120+
// atomically attach the new execution as a new (detached) tail
121+
final Execution oldTail = this.tail.getAndSet(newTail);
122+
// attach our new tail to the old one
123+
oldTail.linkNext(newTail);
124+
125+
return newTail;
126+
}
127+
128+
private Execution compactHead() {
129+
return this.head.updateAndGet(Execution::seekHead);
130+
}
131+
132+
static class Execution {
133+
134+
private final long startNanos;
135+
136+
private volatile boolean isComplete;
137+
private final AtomicReference<Execution> next = new AtomicReference<>();
138+
139+
Execution(long startNanos) {
140+
this(startNanos, false);
141+
}
142+
143+
Execution(final long startNanos,
144+
final boolean isComplete) {
145+
this.startNanos = startNanos;
146+
this.isComplete = isComplete;
147+
}
148+
149+
/**
150+
* marks this execution as complete
151+
* @return true if the completion resulted in a compaction
152+
*/
153+
boolean markComplete() {
154+
isComplete = true;
155+
156+
final Execution preCompletionNext = this.next.get();
157+
if (preCompletionNext != null) {
158+
final Execution result = this.next.updateAndGet(Execution::seekHead);
159+
return result != preCompletionNext;
160+
}
161+
162+
return false;
163+
}
164+
165+
private void linkNext(final Execution proposedNext) {
166+
final Execution result = next.updateAndGet((ex) -> ex == null ? proposedNext : ex);
167+
if (result != proposedNext) {
168+
throw new IllegalStateException();
169+
}
170+
}
171+
172+
/**
173+
* @return the first {@code Execution} that is either not yet complete
174+
* or is the current tail, possibly itself.
175+
*/
176+
private Execution seekHead() {
177+
Execution compactedHead = this;
178+
Execution candidate = this.next.get();
179+
while (candidate != null && compactedHead.isComplete) {
180+
compactedHead = candidate;
181+
candidate = candidate.next.get();
182+
}
183+
return compactedHead;
184+
}
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.logstash.plugins.inputs.http.util;
2+
3+
import org.logstash.plugins.inputs.http.IMessageHandler;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* An implementation of {@link IMessageHandler} that wraps another {@link IMessageHandler} with an
9+
* {@link ExecutionObserver}, ensuring that the delegate's {@link IMessageHandler#onNewMessage} is
10+
* observed.
11+
*/
12+
public class ExecutionObservingMessageHandler implements IMessageHandler {
13+
private final ExecutionObserver executionObserver;
14+
private final IMessageHandler delegate;
15+
16+
public ExecutionObservingMessageHandler(final ExecutionObserver executionObserver,
17+
final IMessageHandler delegate) {
18+
this.executionObserver = executionObserver;
19+
this.delegate = delegate;
20+
}
21+
22+
@Override
23+
public boolean onNewMessage(final String remoteAddress,
24+
final Map<String, String> headers,
25+
final String body) {
26+
return executionObserver.observeExecution(() -> delegate.onNewMessage(remoteAddress, headers, body));
27+
}
28+
29+
@Override
30+
public boolean validatesToken(final String token) {
31+
return delegate.validatesToken(token);
32+
}
33+
34+
@Override
35+
public boolean requiresToken() {
36+
return delegate.requiresToken();
37+
}
38+
39+
@Override
40+
public IMessageHandler copy() {
41+
return new ExecutionObservingMessageHandler(this.executionObserver, delegate.copy());
42+
}
43+
44+
@Override
45+
public Map<String, String> responseHeaders() {
46+
return delegate.responseHeaders();
47+
}
48+
}

0 commit comments

Comments
 (0)