Skip to content

Commit 240850c

Browse files
支持kqueue
1 parent eafd98b commit 240850c

File tree

9 files changed

+131
-25
lines changed

9 files changed

+131
-25
lines changed

src/main/java/com/github/netty/core/AbstractNettyClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void setIoRatio(int ioRatio) {
181181
if (worker instanceof NioEventLoopGroup) {
182182
((NioEventLoopGroup) worker).setIoRatio(ioRatio);
183183
} else if (worker instanceof EpollEventLoopGroup) {
184-
// ((EpollEventLoopGroup) worker).setIoRatio(ioRatio);
184+
((EpollEventLoopGroup) worker).setIoRatio(ioRatio);
185185
}
186186
this.ioRatio = ioRatio;
187187
}

src/main/java/com/github/netty/core/AbstractNettyServer.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package com.github.netty.core;
22

3-
import com.github.netty.core.util.*;
3+
import com.github.netty.core.util.LoggerFactoryX;
4+
import com.github.netty.core.util.LoggerX;
5+
import com.github.netty.core.util.NamespaceUtil;
6+
import com.github.netty.core.util.ThreadFactoryX;
47
import io.netty.bootstrap.ChannelFactory;
58
import io.netty.bootstrap.ServerBootstrap;
69
import io.netty.buffer.ByteBufAllocator;
710
import io.netty.channel.*;
811
import io.netty.channel.epoll.Epoll;
912
import io.netty.channel.epoll.EpollEventLoopGroup;
1013
import io.netty.channel.epoll.EpollServerSocketChannel;
14+
import io.netty.channel.kqueue.KQueue;
15+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
16+
import io.netty.channel.kqueue.KQueueServerSocketChannel;
1117
import io.netty.channel.nio.NioEventLoopGroup;
1218
import io.netty.channel.socket.ServerSocketChannel;
1319
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -25,7 +31,25 @@
2531
* @author wangzihao
2632
*/
2733
public abstract class AbstractNettyServer implements Runnable {
34+
private static final boolean SUPPORT_KQUEUE;
35+
36+
static {
37+
boolean kqueue;
38+
try {
39+
ClassLoader classLoader = AbstractNettyServer.class.getClassLoader();
40+
if (classLoader == null) {
41+
classLoader = ClassLoader.getSystemClassLoader();
42+
}
43+
Class.forName("io.netty.channel.kqueue.KQueue", false, classLoader);
44+
kqueue = true;
45+
} catch (Throwable e) {
46+
kqueue = false;
47+
}
48+
SUPPORT_KQUEUE = kqueue;
49+
}
50+
2851
private final boolean enableEpoll;
52+
private final boolean enableKQueue;
2953
protected LoggerX logger = LoggerFactoryX.getLogger(getClass());
3054
private final String name;
3155
private ServerSocketChannel serverChannel;
@@ -50,11 +74,21 @@ public AbstractNettyServer(InetSocketAddress address) {
5074

5175
public AbstractNettyServer(String preName, InetSocketAddress address) {
5276
super();
53-
this.enableEpoll = Epoll.isAvailable();
77+
boolean enableEpoll = false;
78+
boolean enableKQueue = false;
79+
if (Epoll.isAvailable()) {
80+
enableEpoll = true;
81+
} else if (SUPPORT_KQUEUE && KQueue.isAvailable()) {
82+
enableKQueue = true;
83+
}
84+
this.enableEpoll = enableEpoll;
85+
this.enableKQueue = enableKQueue;
5486
this.serverAddress = address;
5587
this.name = NamespaceUtil.newIdName(preName, getClass());
5688
if (enableEpoll) {
5789
logger.info("enable epoll server = {}", this);
90+
} else if (enableKQueue) {
91+
logger.info("enable kqueue server = {}", this);
5892
}
5993
}
6094

@@ -66,7 +100,9 @@ public void setIoRatio(int ioRatio) {
66100
if (worker instanceof NioEventLoopGroup) {
67101
((NioEventLoopGroup) worker).setIoRatio(ioRatio);
68102
} else if (worker instanceof EpollEventLoopGroup) {
69-
// ((EpollEventLoopGroup) worker).setIoRatio(ioRatio);
103+
((EpollEventLoopGroup) worker).setIoRatio(ioRatio);
104+
} else if (worker instanceof KQueueEventLoopGroup) {
105+
((KQueueEventLoopGroup) worker).setIoRatio(ioRatio);
70106
}
71107
this.ioRatio = ioRatio;
72108
}
@@ -92,7 +128,13 @@ protected ServerBootstrap newServerBootstrap() {
92128
protected EventLoopGroup newWorkerEventLoopGroup() {
93129
EventLoopGroup worker;
94130
if (enableEpoll) {
95-
worker = new EpollEventLoopGroup(ioThreadCount, new ThreadFactoryX("Epoll", "Server-Worker", false));
131+
EpollEventLoopGroup epollWorker = new EpollEventLoopGroup(ioThreadCount, new ThreadFactoryX("Epoll", "Server-Worker", false));
132+
epollWorker.setIoRatio(ioRatio);
133+
worker = epollWorker;
134+
} else if (enableKQueue) {
135+
KQueueEventLoopGroup kqueueWorker = new KQueueEventLoopGroup(ioThreadCount, new ThreadFactoryX("Kqueue", "Server-Worker", false));
136+
kqueueWorker.setIoRatio(ioRatio);
137+
worker = kqueueWorker;
96138
} else {
97139
NioEventLoopGroup jdkWorker = new NioEventLoopGroup(ioThreadCount, new ThreadFactoryX("NIO", "Server-Worker", false));
98140
jdkWorker.setIoRatio(ioRatio);
@@ -104,13 +146,11 @@ protected EventLoopGroup newWorkerEventLoopGroup() {
104146
protected EventLoopGroup newBossEventLoopGroup() {
105147
EventLoopGroup boss;
106148
if (enableEpoll) {
107-
EpollEventLoopGroup epollBoss = new EpollEventLoopGroup(1, new ThreadFactoryX("Epoll", "Server-Boss", false));
108-
// epollBoss.setIoRatio(ioRatio);
109-
boss = epollBoss;
149+
boss = new EpollEventLoopGroup(1, new ThreadFactoryX("Epoll", "Server-Boss", false));
150+
} else if (enableKQueue) {
151+
boss = new KQueueEventLoopGroup(1, new ThreadFactoryX("Kqueue", "Server-Boss", false));
110152
} else {
111-
NioEventLoopGroup jdkBoss = new NioEventLoopGroup(1, new ThreadFactoryX("NIO", "Server-Boss", false));
112-
jdkBoss.setIoRatio(ioRatio);
113-
boss = jdkBoss;
153+
boss = new NioEventLoopGroup(1, new ThreadFactoryX("NIO", "Server-Boss", false));
114154
}
115155
return boss;
116156
}
@@ -127,6 +167,8 @@ protected ChannelFactory<? extends ServerChannel> newServerChannelFactory() {
127167
ChannelFactory<? extends ServerChannel> channelFactory;
128168
if (enableEpoll) {
129169
channelFactory = EpollServerSocketChannel::new;
170+
} else if (enableKQueue) {
171+
channelFactory = KQueueServerSocketChannel::new;
130172
} else {
131173
channelFactory = NioServerSocketChannel::new;
132174
}

src/main/java/com/github/netty/protocol/servlet/ServletContext.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@
4141
* 2018/7/14/014
4242
*/
4343
public class ServletContext implements javax.servlet.ServletContext {
44+
45+
private static final boolean SUPPORT_SET_BASE_DIR;
46+
47+
static {
48+
boolean supportSetBaseDir;
49+
try {
50+
DefaultHttpDataFactory.class.getDeclaredMethod("setBaseDir", String.class);
51+
supportSetBaseDir = true;
52+
} catch (Throwable e) {
53+
supportSetBaseDir = false;
54+
}
55+
SUPPORT_SET_BASE_DIR = supportSetBaseDir;
56+
}
57+
4458
public static final int MIN_FILE_SIZE_THRESHOLD = 16384;
4559
public static final String DEFAULT_UPLOAD_DIR = "/upload";
4660
public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE = "javax.websocket.server.ServerContainer";
@@ -277,8 +291,10 @@ public HttpDataFactory getHttpDataFactory(Charset charset) {
277291
Map<Charset, DefaultHttpDataFactory> httpDataFactoryMap = httpDataFactoryThreadLocal.get();
278292
return httpDataFactoryMap.computeIfAbsent(charset, c -> {
279293
DefaultHttpDataFactory factory = new DefaultHttpDataFactory(fileSizeThreshold, c);
280-
factory.setDeleteOnExit(true);
281-
factory.setBaseDir(resourceManager.mkdirs(DEFAULT_UPLOAD_DIR).toString());
294+
if (SUPPORT_SET_BASE_DIR) {
295+
factory.setDeleteOnExit(true);
296+
factory.setBaseDir(resourceManager.mkdirs(DEFAULT_UPLOAD_DIR).toString());
297+
}
282298
return factory;
283299
});
284300
}
@@ -419,7 +435,7 @@ public String getMimeType(String file) {
419435
return null;
420436
}
421437
String extension = file.substring(period + 1);
422-
if (extension.length() < 1) {
438+
if (extension.isEmpty()) {
423439
return null;
424440
}
425441
return mimeMappings.get(extension);

src/main/java/com/github/netty/protocol/servlet/ServletOutputStream.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,7 @@ private void blockIfNeed(ChannelProgressivePromise promise) throws IOException {
151151
return;
152152
}
153153
if (context.executor().inEventLoop()) {
154-
// 1 time slices
155-
Thread.yield();
156154
context.flush();
157-
Thread.yield();
158155
ChannelUtils.forceFlush(context.channel());
159156
} else {
160157
int bufferSize = exchange.getResponse().getBufferSize();

src/main/java/com/github/netty/protocol/servlet/SslContextBuilders.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.io.File;
88
import java.io.FileInputStream;
99
import java.io.IOException;
10+
import java.lang.reflect.Method;
1011
import java.nio.file.Files;
1112
import java.security.KeyStore;
1213
import java.security.KeyStoreException;
@@ -16,6 +17,19 @@
1617

1718
public class SslContextBuilders {
1819

20+
private static final boolean IS_ALPN_SUPPORTED;
21+
22+
static {
23+
boolean supportIsalpnsupported;
24+
try {
25+
Method isAlpnSupported = SslProvider.class.getDeclaredMethod("isAlpnSupported", SslProvider.class);
26+
supportIsalpnsupported = Boolean.TRUE.equals(isAlpnSupported.invoke(null, SslProvider.OPENSSL));
27+
} catch (Throwable e) {
28+
supportIsalpnsupported = false;
29+
}
30+
IS_ALPN_SUPPORTED = supportIsalpnsupported;
31+
}
32+
1933
public static SslContextBuilder newSslContextBuilderJks(File jksKeyFile, File jksPassword) throws IOException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException {
2034
String password = jksPassword == null ? null : new String(Files.readAllBytes(jksPassword.toPath()));
2135
return newSslContextBuilderJks(jksKeyFile, password);
@@ -38,9 +52,9 @@ public static SslContextBuilder newSslContextBuilderPem(File crtFile, File pemFi
3852
public static SslContext newSslContext(SslContextBuilder builder, boolean h2) throws SSLException {
3953
String[] protocols = h2 ? new String[]{ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1}
4054
: new String[]{ApplicationProtocolNames.HTTP_1_1};
41-
return builder.sslProvider(SslProvider.isAlpnSupported(SslProvider.OPENSSL) ?
42-
SslProvider.OPENSSL :
43-
SslProvider.JDK)
55+
return builder.sslProvider(IS_ALPN_SUPPORTED ?
56+
SslProvider.OPENSSL :
57+
SslProvider.JDK)
4458
.applicationProtocolConfig(new ApplicationProtocolConfig(
4559
ApplicationProtocolConfig.Protocol.ALPN,
4660
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package io.netty.channel;
22

33
import io.netty.channel.epoll.EpollUtils;
4+
import io.netty.channel.kqueue.KqueueUtils;
45
import io.netty.channel.nio.AbstractNioChannel;
56

67
public class ChannelUtils {
78

89
public static void forceFlush(Channel channel) {
910
Channel.Unsafe unsafe = channel.unsafe();
11+
if (EpollUtils.forceFlush(unsafe)) {
12+
return;
13+
}
14+
if (KqueueUtils.forceFlush(unsafe)) {
15+
return;
16+
}
1017
if (unsafe instanceof AbstractNioChannel.NioUnsafe) {
1118
((AbstractNioChannel.NioUnsafe) unsafe).forceFlush();
12-
} else {
13-
EpollUtils.forceFlush(unsafe);
1419
}
1520
}
1621
}

src/main/java/io/netty/channel/epoll/EpollUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
public class EpollUtils {
66

7-
public static void forceFlush(Channel.Unsafe unsafe) {
7+
public static boolean forceFlush(Channel.Unsafe unsafe) {
88
if (unsafe instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
99
AbstractEpollChannel.AbstractEpollUnsafe epollUnsafe = (AbstractEpollChannel.AbstractEpollUnsafe) unsafe;
1010
epollUnsafe.epollOutReady();
11+
return true;
12+
} else {
13+
return false;
1114
}
1215
}
1316
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.netty.channel.kqueue;
2+
3+
import io.netty.channel.Channel;
4+
5+
public class KqueueUtils {
6+
7+
public static boolean forceFlush(Channel.Unsafe unsafe) {
8+
if (unsafe instanceof AbstractKQueueChannel) {
9+
AbstractKQueueChannel.AbstractKQueueUnsafe epollUnsafe = (AbstractKQueueChannel.AbstractKQueueUnsafe) unsafe;
10+
epollUnsafe.writeReady();
11+
return true;
12+
} else {
13+
return false;
14+
}
15+
}
16+
}

src/main/java/io/netty/handler/codec/http2/HttpToHttp2FrameCodecConnectionHandlerBuilder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,19 @@
2121
@UnstableApi
2222
public final class HttpToHttp2FrameCodecConnectionHandlerBuilder extends
2323
AbstractHttp2ConnectionHandlerBuilder<Http2ConnectionHandler, HttpToHttp2FrameCodecConnectionHandlerBuilder> {
24+
private static final boolean SUPPORT_DECOUPLECLOSEANDGOAWAY;
25+
26+
static {
27+
boolean supportDecouplecloseandgoaway;
28+
try {
29+
Class<?> clazz = Class.forName("io.netty.handler.codec.http2.AbstractHttp2ConnectionHandlerBuilder");
30+
clazz.getDeclaredMethod("decoupleCloseAndGoAway");
31+
supportDecouplecloseandgoaway = true;
32+
} catch (Throwable e) {
33+
supportDecouplecloseandgoaway = false;
34+
}
35+
SUPPORT_DECOUPLECLOSEANDGOAWAY = supportDecouplecloseandgoaway;
36+
}
2437

2538
private boolean compressor = true;
2639

@@ -55,10 +68,10 @@ protected Http2ConnectionHandler build(Http2ConnectionDecoder decoder, Http2Conn
5568
if (compressor) {
5669
encoder = new CompressorHttp2ConnectionEncoder(encoder);
5770
}
58-
try {
71+
if (SUPPORT_DECOUPLECLOSEANDGOAWAY) {
5972
return new HttpToHttp2ConnectionHandler(decoder, encoder, initialSettings,
6073
decoupleCloseAndGoAway(), isValidateHeaders());
61-
} catch (Throwable e) {
74+
} else {
6275
// 兼容netty老版本
6376
return new HttpToHttp2ConnectionHandler(decoder, encoder, initialSettings, isValidateHeaders());
6477
}

0 commit comments

Comments
 (0)