Skip to content

Commit f32a937

Browse files
HADOOP-19362. RPC metrics should be updated correctly when call is defered. (#7224). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]>
1 parent 6cb2e86 commit f32a937

File tree

4 files changed

+139
-24
lines changed

4 files changed

+139
-24
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.classification.InterfaceAudience;
2727
import org.apache.hadoop.classification.InterfaceStability;
2828
import org.apache.hadoop.classification.InterfaceStability.Unstable;
29+
import org.apache.hadoop.classification.VisibleForTesting;
2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.io.Writable;
3132
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -34,7 +35,6 @@
3435
import org.apache.hadoop.security.UserGroupInformation;
3536
import org.apache.hadoop.security.token.SecretManager;
3637
import org.apache.hadoop.security.token.TokenIdentifier;
37-
import org.apache.hadoop.classification.VisibleForTesting;
3838
import org.apache.hadoop.tracing.TraceScope;
3939
import org.apache.hadoop.tracing.Tracer;
4040
import org.apache.hadoop.util.Time;
@@ -393,28 +393,38 @@ static class ProtobufRpcEngineCallbackImpl
393393
private final RPC.Server server;
394394
private final Call call;
395395
private final String methodName;
396-
private final long setupTime;
397396

398397
public ProtobufRpcEngineCallbackImpl() {
399398
this.server = CURRENT_CALL_INFO.get().getServer();
400399
this.call = Server.getCurCall().get();
401400
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
402-
this.setupTime = Time.now();
401+
}
402+
403+
private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
404+
ProcessingDetails details = rpcCall.getProcessingDetails();
405+
rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos,
406+
TimeUnit.NANOSECONDS);
407+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
408+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
409+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
410+
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
403411
}
404412

405413
@Override
406414
public void setResponse(Message message) {
407-
long processingTime = Time.now() - setupTime;
415+
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
416+
updateProcessingDetails(call, deltaNanos);
408417
call.setDeferredResponse(RpcWritable.wrap(message));
409-
server.updateDeferredMetrics(methodName, processingTime);
418+
server.updateDeferredMetrics(call, methodName, deltaNanos);
410419
}
411420

412421
@Override
413422
public void error(Throwable t) {
414-
long processingTime = Time.now() - setupTime;
415-
String detailedMetricsName = t.getClass().getSimpleName();
416-
server.updateDeferredMetrics(detailedMetricsName, processingTime);
423+
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
424+
updateProcessingDetails(call, deltaNanos);
417425
call.setDeferredError(t);
426+
String detailedMetricsName = t.getClass().getSimpleName();
427+
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
418428
}
419429
}
420430

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.io.Writable;
2626
import org.apache.hadoop.io.retry.RetryPolicy;
2727
import org.apache.hadoop.ipc.Client.ConnectionId;
28+
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
2829
import org.apache.hadoop.ipc.RPC.RpcInvoker;
2930
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
3031
import org.apache.hadoop.security.UserGroupInformation;
@@ -425,28 +426,37 @@ static class ProtobufRpcEngineCallbackImpl
425426
private final RPC.Server server;
426427
private final Call call;
427428
private final String methodName;
428-
private final long setupTime;
429429

430430
ProtobufRpcEngineCallbackImpl() {
431431
this.server = CURRENT_CALL_INFO.get().getServer();
432432
this.call = Server.getCurCall().get();
433433
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
434-
this.setupTime = Time.now();
434+
}
435+
436+
private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
437+
ProcessingDetails details = rpcCall.getProcessingDetails();
438+
rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
439+
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
440+
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
441+
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
442+
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
435443
}
436444

437445
@Override
438446
public void setResponse(Message message) {
439-
long processingTime = Time.now() - setupTime;
447+
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
448+
updateProcessingDetails(call, deltaNanos);
440449
call.setDeferredResponse(RpcWritable.wrap(message));
441-
server.updateDeferredMetrics(methodName, processingTime);
450+
server.updateDeferredMetrics(call, methodName, deltaNanos);
442451
}
443452

444453
@Override
445454
public void error(Throwable t) {
446-
long processingTime = Time.now() - setupTime;
447-
String detailedMetricsName = t.getClass().getSimpleName();
448-
server.updateDeferredMetrics(detailedMetricsName, processingTime);
455+
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
456+
updateProcessingDetails(call, deltaNanos);
449457
call.setDeferredError(t);
458+
String detailedMetricsName = t.getClass().getSimpleName();
459+
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
450460
}
451461
}
452462

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

+51-4
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,13 @@ public static Server get() {
351351
* after the call returns.
352352
*/
353353
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
354-
354+
355355
/** @return Get the current call. */
356356
@VisibleForTesting
357357
public static ThreadLocal<Call> getCurCall() {
358358
return CurCall;
359359
}
360-
360+
361361
/**
362362
* Returns the currently active RPC call's sequential ID number. A negative
363363
* call ID indicates an invalid value, such as if there is no currently active
@@ -638,7 +638,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
638638
rpcMetrics.addRpcQueueTime(queueTime);
639639

640640
if (call.isResponseDeferred() || connDropped) {
641-
// call was skipped; don't include it in processing metrics
641+
// The call was skipped; don't include it in processing metrics.
642+
// Will update metrics in method updateDeferredMetrics.
642643
return;
643644
}
644645

@@ -668,9 +669,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
668669
}
669670
}
670671

671-
void updateDeferredMetrics(String name, long processingTime) {
672+
/**
673+
* Update rpc metrics for defered calls.
674+
* @param call The Rpc Call
675+
* @param name Rpc method name
676+
* @param processingTime processing call in ms unit.
677+
*/
678+
void updateDeferredMetrics(Call call, String name, long processingTime) {
679+
long completionTimeNanos = Time.monotonicNowNanos();
680+
long arrivalTimeNanos = call.timestampNanos;
681+
682+
ProcessingDetails details = call.getProcessingDetails();
683+
long waitTime =
684+
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
685+
long responseTime =
686+
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
687+
rpcMetrics.addRpcLockWaitTime(waitTime);
688+
rpcMetrics.addRpcProcessingTime(processingTime);
689+
rpcMetrics.addRpcResponseTime(responseTime);
672690
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
673691
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
692+
// don't include lock wait for detailed metrics.
693+
processingTime -= waitTime;
694+
rpcDetailedMetrics.addProcessingTime(name, processingTime);
695+
696+
// Overall processing time is from arrival to completion.
697+
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
698+
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
699+
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
700+
callQueue.addResponseTime(name, call, details);
701+
if (isLogSlowRPC()) {
702+
logSlowRpcCalls(name, call, details);
703+
}
704+
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
705+
rpcMetrics.incrRpcCallSuccesses();
706+
}
674707
}
675708

676709
/**
@@ -963,6 +996,7 @@ public static class Call implements Schedulable,
963996
final int callId; // the client's call id
964997
final int retryCount; // the retry count of the call
965998
private final long timestampNanos; // time the call was received
999+
protected long startHandleTimestampNanos; // time the call was run
9661000
long responseTimestampNanos; // time the call was served
9671001
private AtomicInteger responseWaitCount = new AtomicInteger(1);
9681002
final RPC.RpcKind rpcKind;
@@ -1167,6 +1201,15 @@ public void setDeferredError(Throwable t) {
11671201
public long getTimestampNanos() {
11681202
return timestampNanos;
11691203
}
1204+
1205+
1206+
public long getStartHandleTimestampNanos() {
1207+
return startHandleTimestampNanos;
1208+
}
1209+
1210+
public void setStartHandleTimestampNanos(long startHandleTimestampNanos) {
1211+
this.startHandleTimestampNanos = startHandleTimestampNanos;
1212+
}
11701213
}
11711214

11721215
/** A RPC extended call queued for handling. */
@@ -1243,6 +1286,7 @@ public Void run() throws Exception {
12431286
}
12441287

12451288
long startNanos = Time.monotonicNowNanos();
1289+
this.setStartHandleTimestampNanos(startNanos);
12461290
Writable value = null;
12471291
ResponseParams responseParams = new ResponseParams();
12481292

@@ -1331,6 +1375,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
13311375
* Send a deferred response, ignoring errors.
13321376
*/
13331377
private void sendDeferedResponse() {
1378+
long startNanos = Time.monotonicNowNanos();
13341379
try {
13351380
connection.sendResponse(this);
13361381
} catch (Exception e) {
@@ -1342,6 +1387,8 @@ private void sendDeferedResponse() {
13421387
.currentThread().getName() + ", CallId="
13431388
+ callId + ", hostname=" + getHostAddress());
13441389
}
1390+
getProcessingDetails().set(Timing.RESPONSE,
1391+
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
13451392
}
13461393

13471394
@Override

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java

+53-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import java.io.IOException;
2122
import java.net.InetSocketAddress;
2223
import java.util.concurrent.Callable;
2324
import java.util.concurrent.CompletionService;
@@ -26,25 +27,35 @@
2627
import java.util.concurrent.Executors;
2728
import java.util.concurrent.Future;
2829

30+
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
2931
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
3032
import org.apache.hadoop.thirdparty.protobuf.RpcController;
3133
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
3234
import org.apache.hadoop.conf.Configuration;
3335
import org.apache.hadoop.ipc.protobuf.TestProtos;
3436
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
3537
import org.junit.Assert;
38+
import org.junit.Before;
3639
import org.junit.Test;
3740
import org.slf4j.Logger;
3841
import org.slf4j.LoggerFactory;
3942

43+
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
44+
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
45+
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
46+
4047
public class TestProtoBufRpcServerHandoff {
4148

4249
public static final Logger LOG =
4350
LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);
4451

45-
@Test(timeout = 20000)
46-
public void test() throws Exception {
47-
Configuration conf = new Configuration();
52+
private static Configuration conf = null;
53+
private static RPC.Server server = null;
54+
private static InetSocketAddress address = null;
55+
56+
@Before
57+
public void setUp() throws IOException {
58+
conf = new Configuration();
4859

4960
TestProtoBufRpcServerHandoffServer serverImpl =
5061
new TestProtoBufRpcServerHandoffServer();
@@ -53,18 +64,21 @@ public void test() throws Exception {
5364

5465
RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
5566
ProtobufRpcEngine2.class);
56-
RPC.Server server = new RPC.Builder(conf)
67+
server = new RPC.Builder(conf)
5768
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
5869
.setInstance(blockingService)
5970
.setVerbose(true)
6071
.setNumHandlers(1) // Num Handlers explicitly set to 1 for test.
6172
.build();
6273
server.start();
6374

64-
InetSocketAddress address = server.getListenerAddress();
75+
address = server.getListenerAddress();
6576
long serverStartTime = System.currentTimeMillis();
6677
LOG.info("Server started at: " + address + " at time: " + serverStartTime);
78+
}
6779

80+
@Test(timeout = 20000)
81+
public void test() throws Exception {
6882
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
6983
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
7084

@@ -93,6 +107,40 @@ public void test() throws Exception {
93107

94108
}
95109

110+
@Test(timeout = 20000)
111+
public void testHandoffMetrics() throws Exception {
112+
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
113+
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
114+
115+
ExecutorService executorService = Executors.newFixedThreadPool(2);
116+
CompletionService<ClientInvocationCallable> completionService =
117+
new ExecutorCompletionService<ClientInvocationCallable>(
118+
executorService);
119+
120+
completionService.submit(new ClientInvocationCallable(client, 5000L));
121+
completionService.submit(new ClientInvocationCallable(client, 5000L));
122+
123+
long submitTime = System.currentTimeMillis();
124+
Future<ClientInvocationCallable> future1 = completionService.take();
125+
Future<ClientInvocationCallable> future2 = completionService.take();
126+
127+
ClientInvocationCallable callable1 = future1.get();
128+
ClientInvocationCallable callable2 = future2.get();
129+
130+
LOG.info(callable1.toString());
131+
LOG.info(callable2.toString());
132+
133+
// Ensure the 5 second sleep responses are within a reasonable time of each
134+
// other.
135+
Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L);
136+
Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L);
137+
138+
// Check rpcMetrics
139+
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
140+
assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb);
141+
assertCounter("RpcProcessingTimeNumOps", 2L, rb);
142+
}
143+
96144
private static class ClientInvocationCallable
97145
implements Callable<ClientInvocationCallable> {
98146
final TestProtoBufRpcServerHandoffProtocol client;

0 commit comments

Comments
 (0)