Skip to content

Commit 6dad468

Browse files
ajkh88Alex Hughes
authored and
Ray Mattingly
committed
HBASE-29231 Throttles should support limits based on handler thread usage time (#7000)
Co-authored-by: Alex Hughes <[email protected]> Signed-off-by: Ray Mattingly <[email protected]>
1 parent 3826e9a commit 6dad468

File tree

18 files changed

+459
-53
lines changed

18 files changed

+459
-53
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ protected static List<ThrottleSettings> fromThrottle(final String userName,
180180
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
181181
ThrottleType.ATOMIC_REQUEST_NUMBER, throttle.getAtomicReqNum()));
182182
}
183+
if (throttle.hasReqHandlerUsageMs()) {
184+
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
185+
ThrottleType.REQUEST_HANDLER_USAGE_MS, throttle.getReqHandlerUsageMs()));
186+
}
183187
return settings;
184188
}
185189

hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ public enum Type {
4444
AtomicRequestNumberExceeded,
4545
AtomicReadSizeExceeded,
4646
AtomicWriteSizeExceeded,
47+
RequestHandlerUsageTimeExceeded,
4748
}
4849

4950
private static final String[] MSG_TYPE = new String[] { "number of requests exceeded",
5051
"request size limit exceeded", "number of read requests exceeded",
5152
"number of write requests exceeded", "write size limit exceeded", "read size limit exceeded",
5253
"request capacity unit exceeded", "read capacity unit exceeded", "write capacity unit exceeded",
53-
"atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded" };
54+
"atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded",
55+
"request handler usage time exceeded" };
5456

5557
private static final String MSG_WAIT = " - wait ";
5658

@@ -145,6 +147,11 @@ public static void throwAtomicWriteSizeExceeded(final long waitInterval)
145147
throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval);
146148
}
147149

150+
public static void throwRequestHandlerUsageTimeExceeded(final long waitInterval)
151+
throws RpcThrottlingException {
152+
throwThrottlingException(Type.RequestHandlerUsageTimeExceeded, waitInterval);
153+
}
154+
148155
private static void throwThrottlingException(final Type type, final long waitInterval)
149156
throws RpcThrottlingException {
150157
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + stringFromMillis(waitInterval);

hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public String toString() {
108108
case WRITE_CAPACITY_UNIT:
109109
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
110110
break;
111+
case REQUEST_HANDLER_USAGE_MS:
112+
builder.append(String.format("%dms", timedQuota.getSoftLimit()));
113+
break;
111114
default:
112115
}
113116
} else if (timedQuota.hasShare()) {

hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,7 @@ public enum ThrottleType {
5959

6060
/** Throttling based on the size of atomic write requests */
6161
ATOMIC_WRITE_SIZE,
62+
63+
/** Throttling based on the handler thread time in milliseconds used */
64+
REQUEST_HANDLER_USAGE_MS,
6265
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2461,6 +2461,8 @@ public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto)
24612461
return ThrottleType.ATOMIC_REQUEST_NUMBER;
24622462
case ATOMIC_WRITE_SIZE:
24632463
return ThrottleType.ATOMIC_WRITE_SIZE;
2464+
case REQUEST_HANDLER_USAGE_MS:
2465+
return ThrottleType.REQUEST_HANDLER_USAGE_MS;
24642466
default:
24652467
throw new RuntimeException("Invalid ThrottleType " + proto);
24662468
}
@@ -2496,6 +2498,8 @@ public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType ty
24962498
return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER;
24972499
case ATOMIC_WRITE_SIZE:
24982500
return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE;
2501+
case REQUEST_HANDLER_USAGE_MS:
2502+
return QuotaProtos.ThrottleType.REQUEST_HANDLER_USAGE_MS;
24992503
default:
25002504
throw new RuntimeException("Invalid ThrottleType " + type);
25012505
}

hbase-protocol-shaded/src/main/protobuf/Quota.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ enum ThrottleType {
5252
ATOMIC_READ_SIZE = 10;
5353
ATOMIC_REQUEST_NUMBER = 11;
5454
ATOMIC_WRITE_SIZE = 12;
55+
REQUEST_HANDLER_USAGE_MS = 13;
5556
}
5657

5758
message Throttle {
@@ -71,6 +72,8 @@ message Throttle {
7172
optional TimedQuota atomic_read_size = 10;
7273
optional TimedQuota atomic_req_num = 11;
7374
optional TimedQuota atomic_write_size = 12;
75+
76+
optional TimedQuota req_handler_usage_ms = 13;
7477
}
7578

7679
message ThrottleRequest {

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import java.util.List;
2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.hadoop.hbase.Cell;
24+
import org.apache.hadoop.hbase.HConstants;
2425
import org.apache.hadoop.hbase.client.Mutation;
2526
import org.apache.hadoop.hbase.client.Result;
2627
import org.apache.hadoop.hbase.ipc.RpcCall;
2728
import org.apache.hadoop.hbase.ipc.RpcServer;
29+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
2830
import org.apache.yetus.audience.InterfaceAudience;
2931
import org.apache.yetus.audience.InterfaceStability;
3032

@@ -45,11 +47,18 @@ public class DefaultOperationQuota implements OperationQuota {
4547

4648
// the available read/write quota size in bytes
4749
protected long readAvailable = 0;
50+
51+
// The estimated handler usage time in ms for a request based on
52+
// the number of requests per second and the number of handler threads
53+
private final long estimatedHandlerUsagePerReq;
54+
4855
// estimated quota
4956
protected long writeConsumed = 0;
5057
protected long readConsumed = 0;
5158
protected long writeCapacityUnitConsumed = 0;
5259
protected long readCapacityUnitConsumed = 0;
60+
protected long handlerUsageTimeConsumed = 0;
61+
5362
// real consumed quota
5463
private final long[] operationSize;
5564
// difference between estimated quota and real consumed quota used in close method
@@ -59,14 +68,15 @@ public class DefaultOperationQuota implements OperationQuota {
5968
protected long readDiff = 0;
6069
protected long writeCapacityUnitDiff = 0;
6170
protected long readCapacityUnitDiff = 0;
71+
protected long handlerUsageTimeDiff = 0;
6272
private boolean useResultSizeBytes;
6373
private long blockSizeBytes;
6474
private long maxScanEstimate;
6575
private boolean isAtomic = false;
6676

6777
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
68-
final QuotaLimiter... limiters) {
69-
this(conf, Arrays.asList(limiters));
78+
final double requestsPerSecond, final QuotaLimiter... limiters) {
79+
this(conf, requestsPerSecond, Arrays.asList(limiters));
7080
this.useResultSizeBytes =
7181
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
7282
this.blockSizeBytes = blockSizeBytes;
@@ -78,15 +88,20 @@ public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
7888
/**
7989
* NOTE: The order matters. It should be something like [user, table, namespace, global]
8090
*/
81-
public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
91+
public DefaultOperationQuota(final Configuration conf, final double requestsPerSecond,
92+
final List<QuotaLimiter> limiters) {
8293
this.writeCapacityUnit =
8394
conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
8495
this.readCapacityUnit =
8596
conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
8697
this.limiters = limiters;
98+
int numHandlerThreads = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
99+
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
100+
this.estimatedHandlerUsagePerReq =
101+
calculateHandlerUsageTimeEstimate(requestsPerSecond, numHandlerThreads);
102+
87103
int size = OperationType.values().length;
88104
operationSize = new long[size];
89-
90105
for (int i = 0; i < size; ++i) {
91106
operationSize[i] = 0;
92107
}
@@ -128,13 +143,13 @@ private void checkQuota(long numWrites, long numReads, boolean isAtomic)
128143
limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
129144
Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
130145
Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
131-
readCapacityUnitConsumed, isAtomic);
146+
readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
132147
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
133148
}
134149

135150
for (final QuotaLimiter limiter : limiters) {
136151
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
137-
readCapacityUnitConsumed, isAtomic);
152+
readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
138153
}
139154
}
140155

@@ -152,12 +167,12 @@ public void close() {
152167
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
153168
readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
154169
}
155-
156170
writeCapacityUnitDiff =
157171
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
158172
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
159173
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
160174
readConsumed);
175+
handlerUsageTimeDiff = calculateHandlerUsageMsDiff();
161176

162177
for (final QuotaLimiter limiter : limiters) {
163178
if (writeDiff != 0) {
@@ -166,6 +181,9 @@ public void close() {
166181
if (readDiff != 0) {
167182
limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic);
168183
}
184+
if (handlerUsageTimeDiff != 0) {
185+
limiter.consumeTime(handlerUsageTimeDiff);
186+
}
169187
}
170188
}
171189

@@ -216,6 +234,8 @@ protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
216234

217235
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
218236
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
237+
238+
handlerUsageTimeConsumed = (numReads + numWrites) * estimatedHandlerUsagePerReq;
219239
}
220240

221241
/**
@@ -238,6 +258,7 @@ protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanReque
238258
}
239259

240260
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
261+
handlerUsageTimeConsumed = estimatedHandlerUsagePerReq;
241262
}
242263

243264
protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
@@ -288,4 +309,25 @@ private long calculateWriteCapacityUnitDiff(final long actualSize, final long es
288309
private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
289310
return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
290311
}
312+
313+
private long calculateHandlerUsageTimeEstimate(final double requestsPerSecond,
314+
final int numHandlerThreads) {
315+
if (requestsPerSecond <= numHandlerThreads) {
316+
// If less than 1 request per second per handler thread, then we use the number of handler
317+
// threads as a baseline to avoid incorrect estimations when the number of requests is very
318+
// low.
319+
return numHandlerThreads;
320+
} else {
321+
double requestsPerMillisecond = Math.ceil(requestsPerSecond / 1000);
322+
// We don't ever want zero here
323+
return Math.max((long) requestsPerMillisecond, 1L);
324+
}
325+
}
326+
327+
private long calculateHandlerUsageMsDiff() {
328+
long currentTime = EnvironmentEdgeManager.currentTime();
329+
long startTime = RpcServer.getCurrentCall().map(RpcCall::getStartTime).orElse(currentTime);
330+
long timeElapsed = currentTime - startTime;
331+
return handlerUsageTimeConsumed - timeElapsed;
332+
}
291333
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota {
4343
private QuotaLimiter regionServerLimiter;
4444

4545
public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
46-
QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
47-
super(conf, blockSizeBytes, limiters);
46+
final double requestsPerSecond, QuotaLimiter regionServerLimiter,
47+
final QuotaLimiter... limiters) {
48+
super(conf, blockSizeBytes, requestsPerSecond, limiters);
4849
this.regionServerLimiter = regionServerLimiter;
4950
}
5051

@@ -78,7 +79,7 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i
7879
estimateQuota.run();
7980
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
8081
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
81-
writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic);
82+
writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
8283
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
8384
// limiter is enough.
8485
boolean exceed = false;
@@ -94,13 +95,13 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i
9495
// 4. Region server limiter is enough and grab estimated consume quota.
9596
readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable());
9697
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
97-
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
98+
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed);
9899
if (exceed) {
99100
// 5. Other quota limiter is exceeded and has not been grabbed (because throw
100101
// RpcThrottlingException in Step 3), so grab it.
101102
for (final QuotaLimiter limiter : limiters) {
102103
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
103-
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
104+
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic, 0L);
104105
}
105106
}
106107
}
@@ -115,6 +116,9 @@ public void close() {
115116
if (readDiff != 0) {
116117
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false);
117118
}
119+
if (handlerUsageTimeDiff != 0) {
120+
regionServerLimiter.consumeTime(handlerUsageTimeDiff);
121+
}
118122
}
119123

120124
private interface CheckQuotaRunnable {

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ private boolean hasThrottle(QuotaProtos.ThrottleType quotaType,
174174
hasThrottle = true;
175175
}
176176
break;
177+
case REQUEST_HANDLER_USAGE_MS:
178+
if (throttleBuilder.hasReqHandlerUsageMs()) {
179+
hasThrottle = true;
180+
}
181+
break;
177182
default:
178183
}
179184
return hasThrottle;
@@ -236,6 +241,9 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException
236241
case ATOMIC_WRITE_SIZE:
237242
throttleBuilder.clearAtomicWriteSize();
238243
break;
244+
case REQUEST_HANDLER_USAGE_MS:
245+
throttleBuilder.clearReqHandlerUsageMs();
246+
break;
239247
default:
240248
}
241249
boolean hasThrottle = false;
@@ -295,6 +303,8 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException
295303
case ATOMIC_WRITE_SIZE:
296304
throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota());
297305
break;
306+
case REQUEST_HANDLER_USAGE_MS:
307+
throttleBuilder.setReqHandlerUsageMs(otherProto.getTimedQuota());
298308
default:
299309
}
300310
}
@@ -388,7 +398,12 @@ public String toString() {
388398
case READ_CAPACITY_UNIT:
389399
case WRITE_CAPACITY_UNIT:
390400
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
401+
break;
402+
case REQUEST_HANDLER_USAGE_MS:
403+
builder.append(String.format("%dms", timedQuota.getSoftLimit()));
404+
break;
391405
default:
406+
// no-op
392407
}
393408
} else if (timedQuota.hasShare()) {
394409
builder.append(String.format("%.2f%%", timedQuota.getShare()));

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ private NoopQuotaLimiter() {
3535
@Override
3636
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
3737
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit,
38-
boolean isAtomic) throws RpcThrottlingException {
38+
boolean isAtomic, long estimateHandlerThreadUsageMs) throws RpcThrottlingException {
3939
// no-op
4040
}
4141

4242
@Override
4343
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
44-
long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) {
44+
long writeCapacityUnit, long readCapacityUnit, boolean isAtomic,
45+
long estimateHandlerThreadUsageMs) {
4546
// no-op
4647
}
4748

@@ -55,6 +56,11 @@ public void consumeRead(final long size, long capacityUnit, boolean isAtomic) {
5556
// no-op
5657
}
5758

59+
@Override
60+
public void consumeTime(final long handlerMillisUsed) {
61+
// no-op
62+
}
63+
5864
@Override
5965
public boolean isBypass() {
6066
return true;

0 commit comments

Comments
 (0)