Skip to content

Commit 7d7a145

Browse files
committed
experimental introduction of a bounded queue and rejection handler
1 parent af61930 commit 7d7a145

File tree

6 files changed

+69
-11
lines changed

6 files changed

+69
-11
lines changed

lib/logstash/inputs/beats.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def register
162162
end # def register
163163

164164
def create_server
165-
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
165+
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, 5)
166166
if @ssl
167167
ssl_context_builder = new_ssl_context_builder
168168
if client_authentification?
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.logstash.beats;
2+
3+
import io.netty.util.concurrent.RejectedExecutionHandler;
4+
import io.netty.util.concurrent.SingleThreadEventExecutor;
5+
6+
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
7+
8+
@Override
9+
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
10+
System.out.println("Requeueing the message");
11+
try {
12+
Thread.sleep(100);
13+
} catch (InterruptedException e) {
14+
e.printStackTrace();
15+
}
16+
executor.execute(task);
17+
}
18+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.logstash.beats;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class DaemonThreadFactory implements ThreadFactory {
7+
8+
final ThreadGroup group;
9+
final AtomicInteger threadNumber = new AtomicInteger(1);
10+
final String namePrefix;
11+
12+
DaemonThreadFactory(String namePrefix) {
13+
this.namePrefix = namePrefix;
14+
group = Thread.currentThread().getThreadGroup();
15+
}
16+
17+
@Override
18+
public Thread newThread(Runnable r) {
19+
Thread t = new Thread(group, r,
20+
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
21+
0);
22+
t.setDaemon(true);
23+
return t;
24+
}
25+
26+
public static ThreadFactory daemonThreadFactory(String namePrefix) {
27+
return new DaemonThreadFactory(namePrefix);
28+
}
29+
30+
}

src/main/java/org/logstash/beats/Runner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ static public void main(String[] args) throws Exception {
2020
// Check for leaks.
2121
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
2222

23-
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors());
23+
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), 128);
2424

2525
if(args.length > 0 && args[0].equals("ssl")) {
2626
logger.debug("Using SSL");

src/main/java/org/logstash/beats/Server.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,30 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.logstash.netty.SslHandlerProvider;
1414

15+
16+
import static org.logstash.beats.DaemonThreadFactory.daemonThreadFactory;
17+
1518
public class Server {
1619
private final static Logger logger = LogManager.getLogger(Server.class);
1720

1821
private final int port;
1922
private final String host;
20-
private final int beatsHeandlerThreadCount;
23+
private final int beatsHandlerThreadCount;
24+
private final int maxPendingRequests;
2125
private NioEventLoopGroup workGroup;
2226
private IMessageListener messageListener = new MessageListener();
2327
private SslHandlerProvider sslHandlerProvider;
2428
private BeatsInitializer beatsInitializer;
29+
private final int connectionBacklog = 128;
2530

2631
private final int clientInactivityTimeoutSeconds;
2732

28-
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) {
33+
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, int maxPendingRequests) {
2934
this.host = host;
3035
port = p;
3136
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
32-
beatsHeandlerThreadCount = threadCount;
37+
beatsHandlerThreadCount = threadCount;
38+
this.maxPendingRequests = maxPendingRequests;
3339
}
3440

3541
public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){
@@ -49,11 +55,12 @@ public Server listen() throws InterruptedException {
4955
try {
5056
logger.info("Starting server on port: {}", this.port);
5157

52-
beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);
58+
beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, beatsHandlerThreadCount, maxPendingRequests);
5359

5460
ServerBootstrap server = new ServerBootstrap();
5561
server.group(workGroup)
5662
.channel(NioServerSocketChannel.class)
63+
.option(ChannelOption.SO_BACKLOG, connectionBacklog)
5764
.childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket.
5865
.childHandler(beatsInitializer);
5966

@@ -108,12 +115,13 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
108115
private final IMessageListener localMessageListener;
109116
private final int localClientInactivityTimeoutSeconds;
110117

111-
BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
118+
BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread, int maxPendingRequests) {
112119
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
113120
this.localMessageListener = messageListener;
114121
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
115122
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
116-
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
123+
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread, daemonThreadFactory("beats-input-handler-executor"), maxPendingRequests, new CustomRejectedExecutionHandler());
124+
//beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
117125
}
118126

119127
public void initChannel(SocketChannel socket){
@@ -127,6 +135,7 @@ public void initChannel(SocketChannel socket){
127135
pipeline.addLast(BEATS_ACKER, new AckEncoder());
128136
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
129137
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
138+
130139
}
131140

132141

src/test/java/org/logstash/beats/ServerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ServerTest {
3434
private EventLoopGroup group;
3535
private final String host = "0.0.0.0";
3636
private final int threadCount = 10;
37+
private final int maxPendingRequests = 128;
3738

3839
@Before
3940
public void setUp() {
@@ -50,7 +51,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte
5051

5152
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
5253

53-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
54+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
5455
final AtomicBoolean otherCause = new AtomicBoolean(false);
5556
server.setMessageListener(new MessageListener() {
5657
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -114,7 +115,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt
114115

115116
final CountDownLatch latch = new CountDownLatch(concurrentConnections);
116117
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
117-
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
118+
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
118119
server.setMessageListener(new MessageListener() {
119120
@Override
120121
public void onNewConnection(ChannelHandlerContext ctx) {
@@ -170,7 +171,7 @@ public void run() {
170171

171172
@Test
172173
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
173-
final Server server = new Server(host, randomPort, 30, threadCount);
174+
final Server server = new Server(host, randomPort, 30, threadCount, maxPendingRequests);
174175
SpyListener listener = new SpyListener();
175176
server.setMessageListener(listener);
176177
Runnable serverTask = new Runnable() {

0 commit comments

Comments
 (0)