Skip to content

Commit f6182e5

Browse files
authored
HADOOP-19864. (Followup) Migrate MiniRPCBenchmark to ProtobufRpcEngine2 (apache#8455)
Reviewed-by: Steve Loughran <stevel@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 38f48fb commit f6182e5

2 files changed

Lines changed: 126 additions & 47 deletions

File tree

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java

Lines changed: 72 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030

3131
import org.apache.hadoop.conf.Configuration;
3232
import org.apache.hadoop.io.Text;
33+
import org.apache.hadoop.ipc.protobuf.MiniRPCBenchmarkProtos.MiniDelegationTokenProto;
34+
import org.apache.hadoop.ipc.protobuf.MiniRPCBenchmarkProtos.MiniGetDelegationTokenRequestProto;
35+
import org.apache.hadoop.ipc.protobuf.MiniRPCBenchmarkProtos.MiniGetDelegationTokenResponseProto;
36+
import org.apache.hadoop.ipc.protobuf.MiniRPCBenchmarkProtos.MiniProtocolService;
3337
import org.apache.hadoop.net.NetUtils;
3438
import org.apache.hadoop.security.KerberosInfo;
3539
import org.apache.hadoop.security.SecurityUtil;
@@ -40,6 +44,10 @@
4044
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
4145
import org.apache.hadoop.security.token.delegation.TestDelegationToken.TestDelegationTokenIdentifier;
4246
import org.apache.hadoop.security.token.delegation.TestDelegationToken.TestDelegationTokenSecretManager;
47+
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
48+
import org.apache.hadoop.thirdparty.protobuf.ByteString;
49+
import org.apache.hadoop.thirdparty.protobuf.RpcController;
50+
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
4351
import org.apache.hadoop.util.Time;
4452
import org.slf4j.event.Level;
4553

@@ -101,18 +109,19 @@ protected TestDelegationTokenSelector() {
101109
super(new Text("MY KIND"));
102110
}
103111
}
104-
105-
@KerberosInfo(
106-
serverPrincipal=USER_NAME_KEY)
107-
@TokenInfo(TestDelegationTokenSelector.class)
108-
public static interface MiniProtocol extends VersionedProtocol {
109-
public static final long versionID = 1L;
110112

111-
/**
112-
* Get a Delegation Token.
113-
*/
114-
public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer)
115-
throws IOException;
113+
/**
114+
* Protobuf based MiniProtocol used by {@link MiniRPCBenchmark}.
115+
* Replaces the legacy Writable based protocol now that
116+
* {@code WritableRpcEngine} has been removed.
117+
*/
118+
@KerberosInfo(serverPrincipal = USER_NAME_KEY)
119+
@TokenInfo(TestDelegationTokenSelector.class)
120+
@ProtocolInfo(
121+
protocolName = "org.apache.hadoop.ipc.MiniRPCBenchmark$MiniProtocol",
122+
protocolVersion = 1)
123+
public interface MiniProtocol extends MiniProtocolService.BlockingInterface {
124+
long versionID = 1L;
116125
}
117126

118127
/**
@@ -125,34 +134,32 @@ static class MiniServer implements MiniProtocol {
125134
private TestDelegationTokenSecretManager secretManager;
126135
private Server rpcServer;
127136

128-
@Override // VersionedProtocol
129-
public long getProtocolVersion(String protocol,
130-
long clientVersion) throws IOException {
131-
if (protocol.equals(MiniProtocol.class.getName()))
132-
return versionID;
133-
throw new IOException("Unknown protocol: " + protocol);
134-
}
135-
136-
@Override // VersionedProtocol
137-
public ProtocolSignature getProtocolSignature(String protocol,
138-
long clientVersion,
139-
int clientMethodsHashCode) throws IOException {
140-
if (protocol.equals(MiniProtocol.class.getName()))
141-
return new ProtocolSignature(versionID, null);
142-
throw new IOException("Unknown protocol: " + protocol);
143-
}
144-
145137
@Override // MiniProtocol
146-
public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer)
147-
throws IOException {
148-
String owner = UserGroupInformation.getCurrentUser().getUserName();
149-
String realUser =
150-
UserGroupInformation.getCurrentUser().getRealUser() == null ? "":
151-
UserGroupInformation.getCurrentUser().getRealUser().getUserName();
152-
TestDelegationTokenIdentifier tokenId =
153-
new TestDelegationTokenIdentifier(
154-
new Text(owner), renewer, new Text(realUser));
155-
return new Token<TestDelegationTokenIdentifier>(tokenId, secretManager);
138+
public MiniGetDelegationTokenResponseProto getDelegationToken(
139+
RpcController controller,
140+
MiniGetDelegationTokenRequestProto request) throws ServiceException {
141+
try {
142+
Text renewer = new Text(request.getRenewer());
143+
String owner = UserGroupInformation.getCurrentUser().getUserName();
144+
String realUser =
145+
UserGroupInformation.getCurrentUser().getRealUser() == null ? "" :
146+
UserGroupInformation.getCurrentUser().getRealUser().getUserName();
147+
TestDelegationTokenIdentifier tokenId =
148+
new TestDelegationTokenIdentifier(
149+
new Text(owner), renewer, new Text(realUser));
150+
Token<TestDelegationTokenIdentifier> token =
151+
new Token<>(tokenId, secretManager);
152+
return MiniGetDelegationTokenResponseProto.newBuilder()
153+
.setToken(MiniDelegationTokenProto.newBuilder()
154+
.setIdentifier(ByteString.copyFrom(token.getIdentifier()))
155+
.setPassword(ByteString.copyFrom(token.getPassword()))
156+
.setKind(token.getKind().toString())
157+
.setService(token.getService().toString())
158+
.build())
159+
.build();
160+
} catch (IOException ioe) {
161+
throw new ServiceException(ioe);
162+
}
156163
}
157164

158165
/** Start RPC server */
@@ -164,8 +171,11 @@ public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer)
164171
new TestDelegationTokenSecretManager(24*60*60*1000,
165172
7*24*60*60*1000,24*60*60*1000,3600000);
166173
secretManager.startThreads();
174+
RPC.setProtocolEngine(conf, MiniProtocol.class, ProtobufRpcEngine2.class);
175+
BlockingService service =
176+
MiniProtocolService.newReflectiveBlockingService(this);
167177
rpcServer = new RPC.Builder(conf).setProtocol(MiniProtocol.class)
168-
.setInstance(this).setBindAddress(DEFAULT_SERVER_ADDRESS).setPort(0)
178+
.setInstance(service).setBindAddress(DEFAULT_SERVER_ADDRESS).setPort(0)
169179
.setNumHandlers(1).setVerbose(false).setSecretManager(secretManager)
170180
.build();
171181
rpcServer.start();
@@ -189,8 +199,8 @@ long connectToServer(Configuration conf, InetSocketAddress addr)
189199
MiniProtocol client = null;
190200
try {
191201
long start = Time.now();
192-
client = RPC.getProxy(MiniProtocol.class,
193-
MiniProtocol.versionID, addr, conf);
202+
RPC.setProtocolEngine(conf, MiniProtocol.class, ProtobufRpcEngine2.class);
203+
client = RPC.getProxy(MiniProtocol.class, MiniProtocol.versionID, addr, conf);
194204
long end = Time.now();
195205
return end - start;
196206
} finally {
@@ -211,14 +221,28 @@ void connectToServerAndGetDelegationToken(
211221
client = proxyUserUgi.doAs(new PrivilegedExceptionAction<MiniProtocol>() {
212222
@Override
213223
public MiniProtocol run() throws IOException {
224+
RPC.setProtocolEngine(conf, MiniProtocol.class,
225+
ProtobufRpcEngine2.class);
214226
MiniProtocol p = RPC.getProxy(MiniProtocol.class,
215227
MiniProtocol.versionID, addr, conf);
216-
Token<TestDelegationTokenIdentifier> token;
217-
token = p.getDelegationToken(new Text(RENEWER));
218-
currentUgi = UserGroupInformation.createUserForTesting(MINI_USER,
219-
GROUP_NAMES);
220-
SecurityUtil.setTokenService(token, addr);
221-
currentUgi.addToken(token);
228+
try {
229+
MiniGetDelegationTokenResponseProto response =
230+
p.getDelegationToken(null,
231+
MiniGetDelegationTokenRequestProto.newBuilder()
232+
.setRenewer(RENEWER).build());
233+
MiniDelegationTokenProto tokenProto = response.getToken();
234+
Token<TestDelegationTokenIdentifier> token = new Token<>(
235+
tokenProto.getIdentifier().toByteArray(),
236+
tokenProto.getPassword().toByteArray(),
237+
new Text(tokenProto.getKind()),
238+
new Text(tokenProto.getService()));
239+
currentUgi = UserGroupInformation.createUserForTesting(MINI_USER,
240+
GROUP_NAMES);
241+
SecurityUtil.setTokenService(token, addr);
242+
currentUgi.addToken(token);
243+
} catch (ServiceException se) {
244+
throw new IOException(se);
245+
}
222246
return p;
223247
}
224248
});
@@ -239,6 +263,7 @@ long connectToServerUsingDelegationToken(
239263
client = currentUgi.doAs(new PrivilegedExceptionAction<MiniProtocol>() {
240264
@Override
241265
public MiniProtocol run() throws IOException {
266+
RPC.setProtocolEngine(conf, MiniProtocol.class, ProtobufRpcEngine2.class);
242267
return RPC.getProxy(MiniProtocol.class,
243268
MiniProtocol.versionID, addr, conf);
244269
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
option java_package = "org.apache.hadoop.ipc.protobuf";
20+
option java_outer_classname = "MiniRPCBenchmarkProtos";
21+
option java_generic_services = true;
22+
option java_generate_equals_and_hash = true;
23+
package hadoop.common;
24+
25+
/**
26+
* Protobuf service used by {@code MiniRPCBenchmark}.
27+
*
28+
* Mirror the legacy WritableRPCEngine-based MiniProtocol so the benchmark
29+
* can exercise RPC connection establishment over ProtobufRpcEngine2.
30+
*
31+
* Messages are inlined (rather than importing Security.proto) to keep
32+
* the test proto self contained and avoid additional proto path setup
33+
* in the build configuration.
34+
*/
35+
36+
message MiniGetDelegationTokenRequestProto {
37+
required string renewer = 1;
38+
}
39+
40+
message MiniDelegationTokenProto {
41+
required bytes identifier = 1;
42+
required bytes password = 2;
43+
required string kind = 3;
44+
required string service = 4;
45+
}
46+
47+
message MiniGetDelegationTokenResponseProto {
48+
optional MiniDelegationTokenProto token = 1;
49+
}
50+
51+
service MiniProtocolService {
52+
rpc getDelegationToken(MiniGetDelegationTokenRequestProto)
53+
returns (MiniGetDelegationTokenResponseProto);
54+
}

0 commit comments

Comments
 (0)