Skip to content

Commit cce3784

Browse files
committed
experimental introduction of a bounded queue and rejection handler
1 parent 707ab46 commit cce3784

File tree

6 files changed

+71
-12
lines changed

6 files changed

+71
-12
lines changed

lib/logstash/inputs/beats.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def register
163163
end # def register
164164

165165
def create_server
166-
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
166+
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, 5)
167167
if @ssl
168168

169169
begin
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.logstash.beats;
2+
3+
import io.netty.util.concurrent.RejectedExecutionHandler;
4+
import io.netty.util.concurrent.SingleThreadEventExecutor;
5+
6+
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
7+
8+
@Override
9+
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
10+
System.out.println("Requeueing the message");
11+
try {
12+
Thread.sleep(100);
13+
} catch (InterruptedException e) {
14+
e.printStackTrace();
15+
}
16+
executor.execute(task);
17+
}
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.logstash.beats;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class DaemonThreadFactory implements ThreadFactory {
7+
8+
final ThreadGroup group;
9+
final AtomicInteger threadNumber = new AtomicInteger(1);
10+
final String namePrefix;
11+
12+
DaemonThreadFactory(String namePrefix) {
13+
this.namePrefix = namePrefix;
14+
group = Thread.currentThread().getThreadGroup();
15+
}
16+
17+
@Override
18+
public Thread newThread(Runnable r) {
19+
Thread t = new Thread(group, r,
20+
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
21+
0);
22+
t.setDaemon(true);
23+
return t;
24+
}
25+
26+
public static ThreadFactory daemonThreadFactory(String namePrefix) {
27+
return new DaemonThreadFactory(namePrefix);
28+
}
29+
30+
}

src/main/java/org/logstash/beats/Runner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ static public void main(String[] args) throws Exception {
1818
// Check for leaks.
1919
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
2020

21-
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors());
21+
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), 128);
2222

2323
if(args.length > 0 && args[0].equals("ssl")) {
2424
logger.debug("Using SSL");

src/main/java/org/logstash/beats/Server.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,33 @@
1616
import java.io.IOException;
1717
import java.security.NoSuchAlgorithmException;
1818
import java.security.cert.CertificateException;
19+
import java.util.concurrent.ArrayBlockingQueue;
20+
import java.util.concurrent.ThreadPoolExecutor;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import static org.logstash.beats.DaemonThreadFactory.daemonThreadFactory;
1924

2025
public class Server {
2126
private final static Logger logger = LogManager.getLogger(Server.class);
2227

2328
private final int port;
2429
private final String host;
25-
private final int beatsHeandlerThreadCount;
30+
private final int beatsHandlerThreadCount;
31+
private final int maxPendingRequests;
2632
private NioEventLoopGroup workGroup;
2733
private IMessageListener messageListener = new MessageListener();
2834
private SslSimpleBuilder sslBuilder;
2935
private BeatsInitializer beatsInitializer;
36+
private final int connectionBacklog = 128;
3037

3138
private final int clientInactivityTimeoutSeconds;
3239

33-
public Server(String host, int p, int timeout, int threadCount) {
40+
public Server(String host, int p, int timeout, int threadCount, int maxPendingRequests) {
3441
this.host = host;
3542
port = p;
3643
clientInactivityTimeoutSeconds = timeout;
37-
beatsHeandlerThreadCount = threadCount;
44+
beatsHandlerThreadCount = threadCount;
45+
this.maxPendingRequests = maxPendingRequests;
3846
}
3947

4048
public void enableSSL(SslSimpleBuilder builder) {
@@ -54,11 +62,12 @@ public Server listen() throws InterruptedException {
5462
try {
5563
logger.info("Starting server on port: {}", this.port);
5664

57-
beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);
65+
beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHandlerThreadCount, maxPendingRequests);
5866

5967
ServerBootstrap server = new ServerBootstrap();
6068
server.group(workGroup)
6169
.channel(NioServerSocketChannel.class)
70+
.option(ChannelOption.SO_BACKLOG, connectionBacklog)
6271
.childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket.
6372
.childHandler(beatsInitializer);
6473

@@ -114,14 +123,14 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
114123
private final int localClientInactivityTimeoutSeconds;
115124
private final boolean localEnableSSL;
116125

117-
BeatsInitializer(Boolean enableSSL, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
126+
BeatsInitializer(Boolean enableSSL, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread, int maxPendingRequests) {
118127
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
119128
this.localEnableSSL = enableSSL;
120129
this.localMessageListener = messageListener;
121130
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
122131
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
123-
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
124-
132+
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread, daemonThreadFactory("beats-input-handler-executor"), maxPendingRequests, new CustomRejectedExecutionHandler());
133+
//beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
125134
}
126135

127136
public void initChannel(SocketChannel socket) throws IOException, NoSuchAlgorithmException, CertificateException {
@@ -136,6 +145,7 @@ public void initChannel(SocketChannel socket) throws IOException, NoSuchAlgorith
136145
pipeline.addLast(BEATS_ACKER, new AckEncoder());
137146
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
138147
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
148+
139149
}
140150

141151
@Override

src/test/java/org/logstash/beats/ServerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ServerTest {
3434
private EventLoopGroup group;
3535
private final String host = "0.0.0.0";
3636
private final int threadCount = 10;
37+
private final int maxPendingRequests = 128;
3738

3839
@Before
3940
public void setUp() {
@@ -50,7 +51,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte
5051

5152
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
5253

53-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
54+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
5455
final AtomicBoolean otherCause = new AtomicBoolean(false);
5556
server.setMessageListener(new MessageListener() {
5657
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -114,7 +115,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt
114115

115116
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
116117
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
117-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
118+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
118119
server.setMessageListener(new MessageListener() {
119120
@Override
120121
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -170,7 +171,7 @@ public void run() {
170171

171172
@Test
172173
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
173-
final Server server = new Server(host, randomPort, 30, threadCount);
174+
final Server server = new Server(host, randomPort, 30, threadCount, maxPendingRequests);
174175
SpyListener listener = new SpyListener();
175176
server.setMessageListener(listener);
176177
Runnable serverTask = new Runnable() {

0 commit comments

Comments
 (0)