Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
*/
private static final ByteSizeValue MTU = new ByteSizeValue(Long.parseLong(System.getProperty("opensearch.net.mtu", "1500")));

/**
* The size of the decompressor buffer for the http content
* that is going to be used with the {@link #createDecompressor()}.
*/
private static final int UNLIMITED_DECOMPRESSOR_BUFFER = 0;

private static final String SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = "http.netty.max_composite_buffer_components";

public static Setting<Integer> SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = new Setting<>(
Expand Down Expand Up @@ -310,8 +316,8 @@ protected void doStart() {
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
}

serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.option(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator);

final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
Expand Down Expand Up @@ -575,7 +581,7 @@ protected ChannelInboundHandlerAdapter createHeaderVerifier() {
* Used in instances to conditionally decompress depending on the outcome from header verification
*/
protected ChannelInboundHandlerAdapter createDecompressor() {
return new HttpContentDecompressor();
return new HttpContentDecompressor(UNLIMITED_DECOMPRESSOR_BUFFER);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public class NettyAllocator {
maxOrder = 5;
}
}
int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize();
int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
Expand All @@ -127,7 +126,6 @@ public class NettyAllocator {
0,
pageSize,
maxOrder,
tinyCacheSize,
smallCacheSize,
normalCacheSize,
useCacheForAllThreads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.transport;

import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -89,9 +91,10 @@ public synchronized SharedGroup getHttpGroup() {
return getGenericGroup();
} else {
if (dedicatedHttpGroup == null) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(
EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(
httpWorkerCount,
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX),
NioIoHandler.newFactory()
);
dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
Expand All @@ -101,9 +104,10 @@ public synchronized SharedGroup getHttpGroup() {

private SharedGroup getGenericGroup() {
if (genericGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(
EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(
workerCount,
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX)
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioIoHandler.newFactory()
);
this.genericGroup = new RefCountedGroup(eventLoopGroup);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGro
bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
}

bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
bootstrap.option(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator);

final boolean reuseAddress = TransportSettings.TCP_REUSE_ADDRESS.get(settings);
bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
Expand Down Expand Up @@ -288,8 +288,8 @@ private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupF
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt()));
}

serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.option(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RECVBUF_ALLOCATOR, recvByteBufAllocator);

serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
Expand Down
Loading