diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ca73765984d..498d743a381 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -55,8 +55,16 @@ public enum Property { + " RPC. See also `instance.ssl.enabled`.", "1.6.0"), RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", Integer.toString(Integer.MAX_VALUE), - PropertyType.BYTES, "The maximum size of a message that can be received by a server.", + PropertyType.BYTES, + "The maximum size of an individual message that can be received by a server. Messages over this size will be rejected.", "2.1.3"), + RPC_MAX_TOTAL_READ_SIZE("rpc.messages.total.size.max", Integer.toString(Integer.MAX_VALUE), + PropertyType.BYTES, + "The maximum size of the sum of all message sizes that a server will hold in memory at once. If a new message comes in " + + "and its frame size plus the sum of all frames in memory would exceed this max then reading of that frame is deferred " + + "until enough memory is free. Once over this limit messages are not rejected, they are not read off the network. TODO " + + "document which server type support this.", + "2.1.4"), RPC_BACKLOG("rpc.backlog", "50", PropertyType.COUNT, "Configures the TCP backlog for the server side sockets created by Thrift." + " This property is not used for SSL type server sockets. A value of zero" diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index 929d8900a56..46ff2c08b4e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -22,7 +22,9 @@ import java.lang.reflect.Field; import java.net.Socket; import java.nio.channels.SelectionKey; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.thrift.server.AbstractNonblockingServer; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -41,15 +43,26 @@ public class CustomNonBlockingServer extends THsHaServer { private static final Logger log = LoggerFactory.getLogger(CustomNonBlockingServer.class); private final Field selectAcceptThreadField; + private final AtomicLong readBufferBytesAllocated2; + public CustomNonBlockingServer(Args args) { super(args); try { selectAcceptThreadField = TNonblockingServer.class.getDeclaredField("selectAcceptThread_"); selectAcceptThreadField.setAccessible(true); + + Field rbbaField = + AbstractNonblockingServer.class.getDeclaredField("readBufferBytesAllocated"); + rbbaField.setAccessible(true); + readBufferBytesAllocated2 = (AtomicLong) rbbaField.get(this); + Field mrbbField = AbstractNonblockingServer.class.getDeclaredField("MAX_READ_BUFFER_BYTES"); + mrbbField.setAccessible(true); + log.debug("MAX_READ_BUFFER_BYTES={}", mrbbField.getLong(this)); } catch (Exception e) { throw new RuntimeException("Failed to access required field in Thrift code.", e); } + } @Override @@ -126,29 +139,47 @@ public void invoke() { // On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere // in the same thread that called invoke() on the buffer TServerUtils.clientAddress.set(clientAddress); + log.trace("CustomFrameBuffer.invoke before client:{} total_buffers_size:{}", clientAddress, + readBufferBytesAllocated2.get()); super.invoke(); + log.trace("CustomFrameBuffer.invoke after client:{} total_buffers_size:{}", clientAddress, + readBufferBytesAllocated2.get()); } @Override public boolean read() { + log.trace( + "CustomFrameBuffer.read before read client: {} buffer_size:{} state:{} total_buffers_size:{}", + clientAddress, buffer_.array().length, state_, readBufferBytesAllocated2.get()); boolean result = super.read(); - if (!result) { - log.trace("CustomFrameBuffer.read returned false when reading data from client: {}", - clientAddress); - } + // if (!result) { + log.trace( + "CustomFrameBuffer.read returned {} when reading data from client: {} buffer_size:{} state:{} total_buffers_size:{}", + result, clientAddress, buffer_.array().length, state_, readBufferBytesAllocated2.get()); + // } return result; } @Override public boolean write() { boolean result = super.write(); - if (!result) { - log.trace("CustomFrameBuffer.write returned false when writing data to client: {}", - clientAddress); - } + // if (!result) { + log.trace( + "CustomFrameBuffer.write returned {} when writing data to client: {} buffer_size:{} state:{}", + result, clientAddress, buffer_.array().length, state_); + // } return result; } + @Override + public void close() { + log.trace("CustomFrameBuffer.close before client:{} total_buffers_size:{}", clientAddress, + readBufferBytesAllocated2.get()); + super.close(); + log.trace("CustomFrameBuffer.close after client:{} total_buffers_size:{}", clientAddress, + readBufferBytesAllocated2.get()); + } + /* * Helper method used to capture the client address inside the CustomFrameBuffer constructor so * that it can be referenced inside the read/write methods for logging purposes. It previously diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 22a191a7a68..91b24ae202f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -133,8 +133,8 @@ static Map getReservedPorts(AccumuloConfiguration config, public static ServerAddress startServer(ServerContext context, String hostname, Property portHintProperty, TProcessor processor, String serverName, String threadName, Property portSearchProperty, Property minThreadProperty, Property threadTimeOutProperty, - Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty) - throws UnknownHostException { + Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty, + Property maxTotalReadBufferProperty) throws UnknownHostException { final AccumuloConfiguration config = context.getConfiguration(); final IntStream portHint = config.getPortStream(portHintProperty); @@ -159,6 +159,11 @@ public static ServerAddress startServer(ServerContext context, String hostname, maxMessageSize = config.getAsBytes(maxMessageSizeProperty); } + long maxTotalReadBuffering = Integer.MAX_VALUE; + if (maxTotalReadBufferProperty != null) { + maxTotalReadBuffering = config.getAsBytes(maxTotalReadBufferProperty); + } + boolean portSearch = false; if (portSearchProperty != null) { portSearch = config.getBoolean(portSearchProperty); @@ -181,8 +186,8 @@ public static ServerAddress startServer(ServerContext context, String hostname, try { return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, - context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), - backlog, portSearch, addresses); + maxTotalReadBuffering, context.getServerSslParams(), context.getSaslParams(), + context.getClientTimeoutInMillis(), backlog, portSearch, addresses); } catch (TTransportException e) { if (portSearch) { // Build a list of reserved ports - as identified by properties of type PropertyType.PORT @@ -206,7 +211,7 @@ public static ServerAddress startServer(ServerContext context, String hostname, HostAndPort addr = HostAndPort.fromParts(hostname, port); return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, - context.getServerSslParams(), context.getSaslParams(), + maxTotalReadBuffering, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), backlog, portSearch, addr); } catch (TTransportException tte) { log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName); @@ -266,7 +271,7 @@ private static ServerAddress createThreadedSelectorServer(HostAndPort address, private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName, final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, - long maxMessageSize, int backlog) throws TTransportException { + long maxMessageSize, long maxTotalBuffered, int backlog) throws TTransportException { NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs() .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort())) @@ -277,7 +282,7 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce options.protocolFactory(protocolFactory); options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); - options.maxReadBufferBytes = maxMessageSize; + options.maxReadBufferBytes = maxTotalBuffered; options.stopTimeoutVal(5); // Create our own very special thread pool. @@ -570,9 +575,9 @@ private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TPr public static ServerAddress startTServer(final AccumuloConfiguration conf, ThriftServerType serverType, TProcessor processor, String serverName, String threadName, int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, - SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean portSearch, - HostAndPort... addresses) { + long maxTotalReadBuffering, SslConnectionParams sslParams, + SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog, + MetricsInfo metricsInfo, boolean portSearch, HostAndPort... addresses) { if (serverType == ThriftServerType.SASL) { processor = updateSaslProcessor(serverType, processor); @@ -581,7 +586,8 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf, try { return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, - sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses); + maxTotalReadBuffering, sslParams, saslParams, serverSocketTimeout, backlog, portSearch, + addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } @@ -597,9 +603,9 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf, private static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, - SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, int backlog, boolean portSearch, HostAndPort... addresses) - throws TTransportException { + long maxTotalReadBuffering, SslConnectionParams sslParams, + SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog, + boolean portSearch, HostAndPort... addresses) throws TTransportException { TProtocolFactory protocolFactory = ThriftUtil.protocolFactory(); // This is presently not supported. It's hypothetically possible, I believe, to work, but it // would require changes in how the transports @@ -639,7 +645,8 @@ private static ServerAddress startTServer(ThriftServerType serverType, TimedProc case CUSTOM_HS_HA: log.debug("Instantiating unsecure custom half-async Thrift server"); serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, + maxTotalReadBuffering, backlog); break; default: throw new IllegalArgumentException("Unknown server type " + serverType); diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 81e9cfa49ce..54789019f1e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -306,7 +306,7 @@ private ServerAddress startServer() throws Exception { return TServerUtils.startServer(context, hostname, Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK, - Property.RPC_MAX_MESSAGE_SIZE); + Property.RPC_MAX_MESSAGE_SIZE, Property.RPC_MAX_TOTAL_READ_SIZE); } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 2079bfe0da1..ccba30373e8 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -255,7 +255,8 @@ protected ServerAddress startCoordinatorClientService() throws UnknownHostExcept "Thrift Client Server", Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, Property.COMPACTION_COORDINATOR_MINTHREADS, Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT, - Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty); + Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty, + Property.RPC_MAX_TOTAL_READ_SIZE); LOG.info("address = {}", sp.address); return sp; } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 9d95a734290..d259b8feff8 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -327,7 +327,7 @@ protected ServerAddress startCompactorClientService() throws UnknownHostExceptio Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK, - maxMessageSizeProperty); + maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE); LOG.info("address = {}", sp.address); return sp; } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 179f97cce99..ee8b1866434 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -428,10 +428,11 @@ private HostAndPort startStatsService() { var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE); long maxMessageSize = getConfiguration().getAsBytes(maxMessageSizeProperty); + // TODO duplicated maxMessageSize ServerAddress server = TServerUtils.startTServer(getConfiguration(), getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, - getContext().getServerSslParams(), getContext().getSaslParams(), 0, + maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0, getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), false, addresses); log.debug("Starting garbage collector listening on " + server.address); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index ee9314d958d..3e6f7e8a726 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1238,8 +1238,8 @@ public void run() { Property.GENERAL_MAX_MESSAGE_SIZE); sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, - Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, - maxMessageSizeProperty); + Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, maxMessageSizeProperty, + Property.RPC_MAX_TOTAL_READ_SIZE); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } @@ -1613,7 +1613,8 @@ private TServer setupReplication() ServerAddress replAddress = TServerUtils.startServer(context, getHostname(), Property.MANAGER_REPLICATION_COORDINATOR_PORT, processor, "Manager Replication Coordinator", "Replication Coordinator", null, Property.MANAGER_REPLICATION_COORDINATOR_MINTHREADS, null, - Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, maxMessageSizeProperty); + Property.MANAGER_REPLICATION_COORDINATOR_THREADCHECK, maxMessageSizeProperty, + Property.RPC_MAX_TOTAL_READ_SIZE); log.info("Started replication coordinator service at " + replAddress.address); // Start the daemon to scan the replication table and make units of work diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 19aade0113e..5155eec29a5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -311,10 +311,11 @@ protected ServerAddress startScanServerClientService() throws UnknownHostExcepti @SuppressWarnings("deprecation") var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), - Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, - Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty); + ServerAddress sp = + TServerUtils.startServer(getContext(), getHostname(), Property.SSERV_CLIENTPORT, processor, + this.getClass().getSimpleName(), "Thrift Client Server", Property.SSERV_PORTSEARCH, + Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT, + Property.SSERV_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE); LOG.info("address = {}", sp.address); return sp; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index c70f5f48e43..f471795a800 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -558,7 +558,7 @@ private HostAndPort startServer(String address, TProcessor processor) ServerAddress sp = TServerUtils.startServer(getContext(), address, Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, - Property.TSERV_THREADCHECK, maxMessageSizeProperty); + Property.TSERV_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE); this.server = sp.server; return sp.address; } @@ -634,7 +634,7 @@ private void startReplicationService() throws UnknownHostException { ServerAddress sp = TServerUtils.startServer(getContext(), clientAddress.getHost(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler", "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS, null, - Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + Property.REPLICATION_THREADCHECK, maxMessageSizeProperty, Property.RPC_MAX_TOTAL_READ_SIZE); this.replServer = sp.server; log.info("Started replication service on {}", sp.address); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSize2IT.java b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSize2IT.java new file mode 100644 index 00000000000..8a0f7e0940b --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSize2IT.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag(MINI_CLUSTER_ONLY) +public class ThriftMaxFrameSize2IT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.RPC_MAX_MESSAGE_SIZE, "256K"); + cfg.setProperty(Property.RPC_MAX_TOTAL_READ_SIZE, "1M"); + } + + @Test + public void testMaxTotalMaxMessages() throws Exception { + String table = getUniqueNames(1)[0]; + var executor = Executors.newCachedThreadPool(); + try (var client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + + List> futures = new ArrayList<>(); + + for (int i = 0; i < 500; i++) { + String row = "bigvalue" + i; + var future = executor.submit(() -> { + try (var writer = client.createBatchWriter(table)) { + Mutation m = new Mutation("bigvalue"); + m.at().family("data").qualifier("1").put(new byte[128_000]); + writer.addMutation(m); + } + + return null; + }); + futures.add(future); + } + + for (var future : futures) { + future.get(); + } + /* + * try(var writer = client.createBatchWriter(table)) { Mutation m = new Mutation("bigvalue"); + * m.at().family("data").qualifier("1").put(new byte[512_000]); // TODO this write will fail + * and retry forever because it exceeds the size of individual message, can we detect this and + * percolate an exception back up? writer.addMutation(m); } + */ + } + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 0b755ba0a1a..5b9fc94c6f8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -124,9 +124,9 @@ public static void main(String[] args) throws Exception { ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2, - ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, - context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, - HostAndPort.fromParts("0.0.0.0", port)); + ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, 10 * 1024 * 1024, null, null, + -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), + false, HostAndPort.fromParts("0.0.0.0", port)); String addressString = serverPort.address.toString(); var zLockPath = diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index e470d1ff48a..333c804da46 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -348,8 +348,9 @@ public static void main(String[] args) throws Exception { TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, - 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), - context.getMetricsInfo(), false, HostAndPort.fromParts("0.0.0.0", opts.port)); + 10 * 1024 * 1024, 10 * 1024 * 1024, null, null, -1, + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, + HostAndPort.fromParts("0.0.0.0", opts.port)); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); diff --git a/test/src/main/resources/log4j2-test.properties b/test/src/main/resources/log4j2-test.properties index d6343c31546..213a3f413b3 100644 --- a/test/src/main/resources/log4j2-test.properties +++ b/test/src/main/resources/log4j2-test.properties @@ -155,6 +155,9 @@ logger.39.level = trace logger.40.name = org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles logger.40.level = trace +logger.41.name = org.apache.accumulo.server.rpc.CustomNonBlockingServer +logger.41.level = trace + rootLogger.level = debug rootLogger.appenderRef.console.ref = STDOUT