Skip to content

WIP add config for max total thrift messages sizes #5550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 2.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ public enum Property {
+ " RPC. See also `instance.ssl.enabled`.",
"1.6.0"),
RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", Integer.toString(Integer.MAX_VALUE),
PropertyType.BYTES, "The maximum size of a message that can be received by a server.",
PropertyType.BYTES,
"The maximum size of an individual message that can be received by a server. Messages over this size will be rejected.",
"2.1.3"),
RPC_MAX_TOTAL_READ_SIZE("rpc.messages.total.size.max", Integer.toString(Integer.MAX_VALUE),
PropertyType.BYTES,
"The maximum size of the sum of all message sizes that a server will hold in memory at once. If a new message comes in "
+ "and its frame size plus the sum of all frames in memory would exceed this max then reading of that frame is deferred "
+ "until enough memory is free. Once over this limit messages are not rejected, they are not read off the network. TODO "
+ "document which server type support this.",
"2.1.4"),
RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT,
"Configures the TCP backlog for the server side sockets created by Thrift."
+ " This property is not used for SSL type server sockets. A value of zero"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.lang.reflect.Field;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
Expand All @@ -41,15 +43,26 @@ public class CustomNonBlockingServer extends THsHaServer {
private static final Logger log = LoggerFactory.getLogger(CustomNonBlockingServer.class);
private final Field selectAcceptThreadField;

private final AtomicLong readBufferBytesAllocated2;

public CustomNonBlockingServer(Args args) {
super(args);

try {
selectAcceptThreadField = TNonblockingServer.class.getDeclaredField("selectAcceptThread_");
selectAcceptThreadField.setAccessible(true);

Field rbbaField =
AbstractNonblockingServer.class.getDeclaredField("readBufferBytesAllocated");
rbbaField.setAccessible(true);
readBufferBytesAllocated2 = (AtomicLong) rbbaField.get(this);
Field mrbbField = AbstractNonblockingServer.class.getDeclaredField("MAX_READ_BUFFER_BYTES");
mrbbField.setAccessible(true);
log.debug("MAX_READ_BUFFER_BYTES={}", mrbbField.getLong(this));
} catch (Exception e) {
throw new RuntimeException("Failed to access required field in Thrift code.", e);
}

}

@Override
Expand Down Expand Up @@ -126,29 +139,47 @@ public void invoke() {
// On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere
// in the same thread that called invoke() on the buffer
TServerUtils.clientAddress.set(clientAddress);
log.trace("CustomFrameBuffer.invoke before client:{} total_buffers_size:{}", clientAddress,
readBufferBytesAllocated2.get());
super.invoke();
log.trace("CustomFrameBuffer.invoke after client:{} total_buffers_size:{}", clientAddress,
readBufferBytesAllocated2.get());
}

@Override
public boolean read() {
log.trace(
"CustomFrameBuffer.read before read client: {} buffer_size:{} state:{} total_buffers_size:{}",
clientAddress, buffer_.array().length, state_, readBufferBytesAllocated2.get());
boolean result = super.read();
if (!result) {
log.trace("CustomFrameBuffer.read returned false when reading data from client: {}",
clientAddress);
}
// if (!result) {
log.trace(
"CustomFrameBuffer.read returned {} when reading data from client: {} buffer_size:{} state:{} total_buffers_size:{}",
result, clientAddress, buffer_.array().length, state_, readBufferBytesAllocated2.get());
// }
return result;
}

@Override
public boolean write() {
boolean result = super.write();
if (!result) {
log.trace("CustomFrameBuffer.write returned false when writing data to client: {}",
clientAddress);
}
// if (!result) {
log.trace(
"CustomFrameBuffer.write returned {} when writing data to client: {} buffer_size:{} state:{}",
result, clientAddress, buffer_.array().length, state_);
// }
return result;
}

@Override
public void close() {
log.trace("CustomFrameBuffer.close before client:{} total_buffers_size:{}", clientAddress,
readBufferBytesAllocated2.get());
super.close();
log.trace("CustomFrameBuffer.close after client:{} total_buffers_size:{}", clientAddress,
readBufferBytesAllocated2.get());
}

/*
* Helper method used to capture the client address inside the CustomFrameBuffer constructor so
* that it can be referenced inside the read/write methods for logging purposes. It previously
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config,
public static ServerAddress startServer(ServerContext context, String hostname,
Property portHintProperty, TProcessor processor, String serverName, String threadName,
Property portSearchProperty, Property minThreadProperty, Property threadTimeOutProperty,
Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
throws UnknownHostException {
Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty,
Property maxTotalReadBufferProperty) throws UnknownHostException {
final AccumuloConfiguration config = context.getConfiguration();

final IntStream portHint = config.getPortStream(portHintProperty);
Expand All @@ -159,6 +159,11 @@ public static ServerAddress startServer(ServerContext context, String hostname,
maxMessageSize = config.getAsBytes(maxMessageSizeProperty);
}

long maxTotalReadBuffering = Integer.MAX_VALUE;
if (maxTotalReadBufferProperty != null) {
maxTotalReadBuffering = config.getAsBytes(maxTotalReadBufferProperty);
}

boolean portSearch = false;
if (portSearchProperty != null) {
portSearch = config.getBoolean(portSearchProperty);
Expand All @@ -181,8 +186,8 @@ public static ServerAddress startServer(ServerContext context, String hostname,
try {
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(),
backlog, portSearch, addresses);
maxTotalReadBuffering, context.getServerSslParams(), context.getSaslParams(),
context.getClientTimeoutInMillis(), backlog, portSearch, addresses);
} catch (TTransportException e) {
if (portSearch) {
// Build a list of reserved ports - as identified by properties of type PropertyType.PORT
Expand All @@ -206,7 +211,7 @@ public static ServerAddress startServer(ServerContext context, String hostname,
HostAndPort addr = HostAndPort.fromParts(hostname, port);
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(), context.getSaslParams(),
maxTotalReadBuffering, context.getServerSslParams(), context.getSaslParams(),
context.getClientTimeoutInMillis(), backlog, portSearch, addr);
} catch (TTransportException tte) {
log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
Expand Down Expand Up @@ -266,7 +271,7 @@ private static ServerAddress createThreadedSelectorServer(HostAndPort address,
private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, final String serverName, final int numThreads,
final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
long maxMessageSize, int backlog) throws TTransportException {
long maxMessageSize, long maxTotalBuffered, int backlog) throws TTransportException {

NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs()
.backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort()))
Expand All @@ -277,7 +282,7 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce

options.protocolFactory(protocolFactory);
options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
options.maxReadBufferBytes = maxMessageSize;
options.maxReadBufferBytes = maxTotalBuffered;
options.stopTimeoutVal(5);

// Create our own very special thread pool.
Expand Down Expand Up @@ -570,9 +575,9 @@ private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TPr
public static ServerAddress startTServer(final AccumuloConfiguration conf,
ThriftServerType serverType, TProcessor processor, String serverName, String threadName,
int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean portSearch,
HostAndPort... addresses) {
long maxTotalReadBuffering, SslConnectionParams sslParams,
SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog,
MetricsInfo metricsInfo, boolean portSearch, HostAndPort... addresses) {

if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
Expand All @@ -581,7 +586,8 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
try {
return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName,
threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize,
sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses);
maxTotalReadBuffering, sslParams, saslParams, serverSocketTimeout, backlog, portSearch,
addresses);
} catch (TTransportException e) {
throw new IllegalStateException(e);
}
Expand All @@ -597,9 +603,9 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
private static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
String serverName, String threadName, int numThreads, long threadTimeOut,
final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, int backlog, boolean portSearch, HostAndPort... addresses)
throws TTransportException {
long maxTotalReadBuffering, SslConnectionParams sslParams,
SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog,
boolean portSearch, HostAndPort... addresses) throws TTransportException {
TProtocolFactory protocolFactory = ThriftUtil.protocolFactory();
// This is presently not supported. It's hypothetically possible, I believe, to work, but it
// would require changes in how the transports
Expand Down Expand Up @@ -639,7 +645,8 @@ private static ServerAddress startTServer(ThriftServerType serverType, TimedProc
case CUSTOM_HS_HA:
log.debug("Instantiating unsecure custom half-async Thrift server");
serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog);
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize,
maxTotalReadBuffering, backlog);
break;
default:
throw new IllegalArgumentException("Unknown server type " + serverType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private ServerAddress startServer() throws Exception {
return TServerUtils.startServer(context, hostname, Property.TSERV_CLIENTPORT, processor,
"TServerUtilsTest", "TServerUtilsTestThread", Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK,
Property.RPC_MAX_MESSAGE_SIZE);
Property.RPC_MAX_MESSAGE_SIZE, Property.RPC_MAX_TOTAL_READ_SIZE);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ protected ServerAddress startCoordinatorClientService() throws UnknownHostExcept
"Thrift Client Server", Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH,
Property.COMPACTION_COORDINATOR_MINTHREADS,
Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT,
Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty);
Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty,
Property.RPC_MAX_TOTAL_READ_SIZE);
LOG.info("address = {}", sp.address);
return sp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ protected ServerAddress startCompactorClientService() throws UnknownHostExceptio
Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS,
Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK,
maxMessageSizeProperty);
maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE);
LOG.info("address = {}", sp.address);
return sp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ private HostAndPort startStatsService() {
var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
Property.GENERAL_MAX_MESSAGE_SIZE);
long maxMessageSize = getConfiguration().getAsBytes(maxMessageSizeProperty);
// TODO duplicated maxMessageSize
ServerAddress server = TServerUtils.startTServer(getConfiguration(),
getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
"GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
getContext().getServerSslParams(), getContext().getSaslParams(), 0,
maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0,
getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), false,
addresses);
log.debug("Starting garbage collector listening on " + server.address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,8 @@ public void run() {
Property.GENERAL_MAX_MESSAGE_SIZE);
sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor,
"Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS,
Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
maxMessageSizeProperty);
Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, maxMessageSizeProperty,
Property.RPC_MAX_TOTAL_READ_SIZE);
} catch (UnknownHostException e) {
throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
}
Expand Down Expand Up @@ -1613,7 +1613,8 @@ private TServer setupReplication()
ServerAddress replAddress = TServerUtils.startServer(context, getHostname(),
Property.MANAGER_REPLICATION_COORDINATOR_PORT, processor, "Manager Replication Coordinator",
"Replication Coordinator", null, Property.MANAGER_REPLICATION_COORDINATOR_MINTHREADS, null,
Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, maxMessageSizeProperty);
Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, maxMessageSizeProperty,
Property.RPC_MAX_TOTAL_READ_SIZE);

log.info("Started replication coordinator service at " + replAddress.address);
// Start the daemon to scan the replication table and make units of work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,11 @@ protected ServerAddress startScanServerClientService() throws UnknownHostExcepti
@SuppressWarnings("deprecation")
var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
ServerAddress sp =
TServerUtils.startServer(getContext(), getHostname(), Property.SSERV_CLIENTPORT, processor,
this.getClass().getSimpleName(), "Thrift Client Server", Property.SSERV_PORTSEARCH,
Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT,
Property.SSERV_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE);

LOG.info("address = {}", sp.address);
return sp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ private HostAndPort startServer(String address, TProcessor processor)
ServerAddress sp = TServerUtils.startServer(getContext(), address, Property.TSERV_CLIENTPORT,
processor, this.getClass().getSimpleName(), "Thrift Client Server",
Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT,
Property.TSERV_THREADCHECK, maxMessageSizeProperty);
Property.TSERV_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE);
this.server = sp.server;
return sp.address;
}
Expand Down Expand Up @@ -634,7 +634,7 @@ private void startReplicationService() throws UnknownHostException {
ServerAddress sp = TServerUtils.startServer(getContext(), clientAddress.getHost(),
Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler",
"Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS, null,
Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
Property.REPLICATION_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE);
this.replServer = sp.server;
log.info("Started replication service on {}", sp.address);

Expand Down
Loading