Skip to content

Commit 2eb55c2

Browse files
committed
Added feature flag named protect_direct_memory to control the usage of OOM checking or not. Enabled by default.
1 parent 173410f commit 2eb55c2

File tree

6 files changed

+39
-12
lines changed

6 files changed

+39
-12
lines changed

docs/index.asciidoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
221221
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
222222
| <<plugins-{type}s-{plugin}-include_codec_tag>> |<<boolean,boolean>>|__Deprecated__
223223
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|Yes
224+
| <<plugins-{type}s-{plugin}-protect_direct_memory>> |<<boolean,boolean>>|No
224225
| <<plugins-{type}s-{plugin}-ssl>> |<<boolean,boolean>>|__Deprecated__
225226
| <<plugins-{type}s-{plugin}-ssl_certificate>> |a valid filesystem path|No
226227
| <<plugins-{type}s-{plugin}-ssl_certificate_authorities>> |<<array,array>>|No
@@ -384,6 +385,17 @@ deprecated[6.5.0, Replaced by <<plugins-{type}s-{plugin}-enrich>>]
384385

385386
The port to listen on.
386387

388+
[id="plugins-{type}s-{plugin}-protect_direct_memory"]
389+
===== `protect_direct_memory`
390+
391+
* Value type is <<boolean,boolean>>
392+
* Default value is `true`
393+
394+
If enabled, actively check native memory used by network part to do parsing and avoid
395+
out of memory conditions. When the consumption of native memory used is close to
396+
the maximum limit, connections are being closed in undetermined order until the safe
397+
memory condition is reestablished.
398+
387399
[id="plugins-{type}s-{plugin}-ssl"]
388400
===== `ssl`
389401
deprecated[6.6.0, Replaced by <<plugins-{type}s-{plugin}-ssl_enabled>>]

lib/logstash/inputs/beats.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base
7474
# The port to listen on.
7575
config :port, :validate => :number, :required => true
7676

77+
# Proactive checks that keep the beats input active when the memory used by protocol parser and network
78+
# related operations is going to terminate.
79+
config :protect_direct_memory, :validate => :boolean, :default => true
80+
7781
# Events are by default sent in plain text. You can
7882
# enable encryption by setting `ssl` to true and configuring
7983
# the `ssl_certificate` and `ssl_key` options.
@@ -243,7 +247,7 @@ def register
243247
end # def register
244248

245249
def create_server
246-
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
250+
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, @protect_direct_memory)
247251
server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
248252
server
249253
end

spec/inputs/beats_spec.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
let(:port) { BeatsInputTest.random_port }
1515
let(:client_inactivity_timeout) { 400 }
1616
let(:threads) { 1 + rand(9) }
17+
let(:protect_direct_memory) { true }
1718
let(:queue) { Queue.new }
1819
let(:config) do
1920
{
@@ -36,7 +37,7 @@
3637
let(:port) { 9001 }
3738

3839
it "sends the required options to the server" do
39-
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads)
40+
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory)
4041
subject.register
4142
end
4243
end
@@ -529,8 +530,8 @@
529530
subject(:plugin) { LogStash::Inputs::Beats.new(config) }
530531

531532
before do
532-
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads)
533-
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads).and_return @server
533+
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads, protect_direct_memory)
534+
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory).and_return @server
534535
expect( @server ).to receive(:listen)
535536

536537
subject.register

src/main/java/org/logstash/beats/Runner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception {
1717
// Check for leaks.
1818
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
1919

20-
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors());
20+
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), true);
2121

2222
if(args.length > 0 && args[0].equals("ssl")) {
2323
logger.debug("Using SSL");

src/main/java/org/logstash/beats/Server.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@ public class Server {
2222
private final int port;
2323
private final String host;
2424
private final int beatsHeandlerThreadCount;
25+
private final boolean protectDirectMemory;
2526
private NioEventLoopGroup workGroup;
2627
private IMessageListener messageListener = new MessageListener();
2728
private SslHandlerProvider sslHandlerProvider;
2829
private BeatsInitializer beatsInitializer;
2930

3031
private final int clientInactivityTimeoutSeconds;
3132

32-
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) {
33+
// public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) {
34+
// this(host, p, clientInactivityTimeoutSeconds, threadCount, true);
35+
// }
36+
37+
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, boolean protectDirectMemory) {
3338
this.host = host;
3439
port = p;
3540
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
3641
beatsHeandlerThreadCount = threadCount;
42+
this.protectDirectMemory = protectDirectMemory;
3743
}
3844

3945
public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){
@@ -130,7 +136,9 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
130136

131137
public void initChannel(SocketChannel socket){
132138
ChannelPipeline pipeline = socket.pipeline();
133-
pipeline.addLast(new OOMConnectionCloser());
139+
if (protectDirectMemory) {
140+
pipeline.addLast(new OOMConnectionCloser());
141+
}
134142

135143
if (isSslEnabled()) {
136144
pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket));
@@ -139,8 +147,10 @@ public void initChannel(SocketChannel socket){
139147
new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
140148
pipeline.addLast(BEATS_ACKER, new AckEncoder());
141149
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
142-
pipeline.addLast(new FlowLimiterHandler());
143-
pipeline.addLast(new ThunderingGuardHandler());
150+
if (protectDirectMemory) {
151+
pipeline.addLast(new FlowLimiterHandler());
152+
pipeline.addLast(new ThunderingGuardHandler());
153+
}
144154
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser());
145155
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener));
146156
}

src/test/java/org/logstash/beats/ServerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte
5050

5151
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
5252

53-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
53+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, true);
5454
final AtomicBoolean otherCause = new AtomicBoolean(false);
5555
server.setMessageListener(new MessageListener() {
5656
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -114,7 +114,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt
114114

115115
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
116116
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
117-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
117+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, true);
118118
server.setMessageListener(new MessageListener() {
119119
@Override
120120
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -170,7 +170,7 @@ public void run() {
170170

171171
@Test
172172
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
173-
final Server server = new Server(host, randomPort, 30, threadCount);
173+
final Server server = new Server(host, randomPort, 30, threadCount, true);
174174
SpyListener listener = new SpyListener();
175175
server.setMessageListener(listener);
176176
Runnable serverTask = new Runnable() {

0 commit comments

Comments
 (0)