Skip to content

Commit e29a6a5

Browse files
committed
HADOOP-9194. RPC Support for QoS. (Junping Du via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461370 13f79535-47bb-0310-9956-ffa450edef68
1 parent d8877af commit e29a6a5

File tree

5 files changed

+122
-15
lines changed

5 files changed

+122
-15
lines changed

hadoop-common-project/hadoop-common/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Trunk (Unreleased)
1717

1818
HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
1919

20+
HADOOP-9194. RPC Support for QoS. (Junping Du via llu)
21+
2022
NEW FEATURES
2123

2224
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ private class Connection extends Thread {
257257
private final ConnectionId remoteId; // connection id
258258
private AuthMethod authMethod; // authentication method
259259
private Token<? extends TokenIdentifier> token;
260+
private int serviceClass;
260261
private SaslRpcClient saslRpcClient;
261262

262263
private Socket socket = null; // connected socket
@@ -279,7 +280,7 @@ private class Connection extends Thread {
279280

280281
private final Object sendRpcRequestLock = new Object();
281282

282-
public Connection(ConnectionId remoteId) throws IOException {
283+
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
283284
this.remoteId = remoteId;
284285
this.server = remoteId.getAddress();
285286
if (server.isUnresolved()) {
@@ -296,6 +297,7 @@ public Connection(ConnectionId remoteId) throws IOException {
296297
this.tcpNoDelay = remoteId.getTcpNoDelay();
297298
this.doPing = remoteId.getDoPing();
298299
this.pingInterval = remoteId.getPingInterval();
300+
this.serviceClass = serviceClass;
299301
if (LOG.isDebugEnabled()) {
300302
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
301303
}
@@ -747,7 +749,9 @@ private void handleConnectionFailure(int curRetries, IOException ioe
747749
* +----------------------------------+
748750
* | "hrpc" 4 bytes |
749751
* +----------------------------------+
750-
* | Version (1 bytes) |
752+
* | Version (1 byte) |
753+
* +----------------------------------+
754+
* | Service Class (1 byte) |
751755
* +----------------------------------+
752756
* | Authmethod (1 byte) |
753757
* +----------------------------------+
@@ -760,6 +764,7 @@ private void writeConnectionHeader(OutputStream outStream)
760764
// Write out the header, version and authentication method
761765
out.write(Server.HEADER.array());
762766
out.write(Server.CURRENT_VERSION);
767+
out.write(serviceClass);
763768
authMethod.write(out);
764769
Server.IpcSerializationType.PROTOBUF.write(out);
765770
out.flush();
@@ -1179,19 +1184,33 @@ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr
11791184

11801185

11811186
/**
1182-
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
1187+
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
11831188
* Class, UserGroupInformation, int, Configuration)}
11841189
* except that rpcKind is writable.
11851190
*/
1186-
public Writable call(Writable param, InetSocketAddress addr,
1191+
public Writable call(Writable param, InetSocketAddress addr,
11871192
Class<?> protocol, UserGroupInformation ticket,
1188-
int rpcTimeout, Configuration conf)
1193+
int rpcTimeout, Configuration conf)
11891194
throws InterruptedException, IOException {
1190-
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
1195+
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
11911196
ticket, rpcTimeout, conf);
11921197
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
11931198
}
11941199

1200+
/**
1201+
* Same as {@link #call(Writable, InetSocketAddress,
1202+
* Class, UserGroupInformation, int, Configuration)}
1203+
* except that specifying serviceClass.
1204+
*/
1205+
public Writable call(Writable param, InetSocketAddress addr,
1206+
Class<?> protocol, UserGroupInformation ticket,
1207+
int rpcTimeout, int serviceClass, Configuration conf)
1208+
throws InterruptedException, IOException {
1209+
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
1210+
ticket, rpcTimeout, conf);
1211+
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
1212+
}
1213+
11951214
/**
11961215
* Make a call, passing <code>param</code>, to the IPC server running at
11971216
* <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1218,21 +1237,39 @@ public Writable call(Writable param, ConnectionId remoteId)
12181237
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
12191238
}
12201239

1240+
/**
1241+
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
1242+
* <code>remoteId</code>, returning the rpc respond.
1243+
*
1244+
* @param rpcKind
1245+
* @param rpcRequest - contains serialized method and method parameters
1246+
* @param remoteId - the target rpc server
1247+
* @returns the rpc response
1248+
* Throws exceptions if there are network problems or if the remote code
1249+
* threw an exception.
1250+
*/
1251+
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
1252+
ConnectionId remoteId) throws InterruptedException, IOException {
1253+
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
1254+
}
1255+
12211256
/**
12221257
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
12231258
* <code>remoteId</code>, returning the rpc respond.
12241259
*
12251260
* @param rpcKind
12261261
* @param rpcRequest - contains serialized method and method parameters
12271262
* @param remoteId - the target rpc server
1263+
* @param serviceClass - service class for RPC
12281264
* @returns the rpc response
12291265
* Throws exceptions if there are network problems or if the remote code
12301266
* threw an exception.
12311267
*/
12321268
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
1233-
ConnectionId remoteId) throws InterruptedException, IOException {
1269+
ConnectionId remoteId, int serviceClass)
1270+
throws InterruptedException, IOException {
12341271
Call call = new Call(rpcKind, rpcRequest);
1235-
Connection connection = getConnection(remoteId, call);
1272+
Connection connection = getConnection(remoteId, call, serviceClass);
12361273
try {
12371274
connection.sendRpcRequest(call); // send the rpc request
12381275
} catch (RejectedExecutionException e) {
@@ -1289,7 +1326,7 @@ Set<ConnectionId> getConnectionIds() {
12891326
/** Get a connection from the pool, or create a new one and add it to the
12901327
* pool. Connections to a given ConnectionId are reused. */
12911328
private Connection getConnection(ConnectionId remoteId,
1292-
Call call)
1329+
Call call, int serviceClass)
12931330
throws IOException, InterruptedException {
12941331
if (!running.get()) {
12951332
// the client is stopped
@@ -1304,7 +1341,7 @@ private Connection getConnection(ConnectionId remoteId,
13041341
synchronized (connections) {
13051342
connection = connections.get(remoteId);
13061343
if (connection == null) {
1307-
connection = new Connection(remoteId);
1344+
connection = new Connection(remoteId, serviceClass);
13081345
connections.put(remoteId, connection);
13091346
}
13101347
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@
7777
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
7878
@InterfaceStability.Evolving
7979
public class RPC {
80+
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
8081
public enum RpcKind {
8182
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
8283
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
8384
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
8485
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
85-
private static final short FIRST_INDEX = RPC_BUILTIN.value;
8686
public final short value; //TODO make it private
8787

8888
RpcKind(short val) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@ Iterable<? extends Thread> getHandlers() {
438438
return Arrays.asList(handlers);
439439
}
440440

441+
@VisibleForTesting
442+
List<Connection> getConnections() {
443+
return connectionList;
444+
}
445+
441446
/**
442447
* Refresh the service authorization ACL for the service handled by this server.
443448
*/
@@ -1104,6 +1109,7 @@ public class Connection {
11041109
private ByteBuffer connectionHeaderBuf = null;
11051110
private ByteBuffer unwrappedData;
11061111
private ByteBuffer unwrappedDataLengthBuffer;
1112+
private int serviceClass;
11071113

11081114
UserGroupInformation user = null;
11091115
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1314,14 +1320,17 @@ public int readAndProcess() throws IOException, InterruptedException {
13141320
if (!connectionHeaderRead) {
13151321
//Every connection is expected to send the header.
13161322
if (connectionHeaderBuf == null) {
1317-
connectionHeaderBuf = ByteBuffer.allocate(3);
1323+
connectionHeaderBuf = ByteBuffer.allocate(4);
13181324
}
13191325
count = channelRead(channel, connectionHeaderBuf);
13201326
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
13211327
return count;
13221328
}
13231329
int version = connectionHeaderBuf.get(0);
1324-
byte[] method = new byte[] {connectionHeaderBuf.get(1)};
1330+
// TODO we should add handler for service class later
1331+
this.setServiceClass(connectionHeaderBuf.get(1));
1332+
1333+
byte[] method = new byte[] {connectionHeaderBuf.get(2)};
13251334
authMethod = AuthMethod.read(new DataInputStream(
13261335
new ByteArrayInputStream(method)));
13271336
dataLengthBuffer.flip();
@@ -1345,7 +1354,7 @@ public int readAndProcess() throws IOException, InterruptedException {
13451354
}
13461355

13471356
IpcSerializationType serializationType = IpcSerializationType
1348-
.fromByte(connectionHeaderBuf.get(2));
1357+
.fromByte(connectionHeaderBuf.get(3));
13491358
if (serializationType != IpcSerializationType.PROTOBUF) {
13501359
respondUnsupportedSerialization(serializationType);
13511360
return -1;
@@ -1735,6 +1744,22 @@ private boolean authorizeConnection() throws IOException {
17351744
return true;
17361745
}
17371746

1747+
/**
1748+
* Get service class for connection
1749+
* @return the serviceClass
1750+
*/
1751+
public int getServiceClass() {
1752+
return serviceClass;
1753+
}
1754+
1755+
/**
1756+
* Set service class for connection
1757+
* @param serviceClass the serviceClass to set
1758+
*/
1759+
public void setServiceClass(int serviceClass) {
1760+
this.serviceClass = serviceClass;
1761+
}
1762+
17381763
private synchronized void close() throws IOException {
17391764
disposeSasl();
17401765
data = null;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.io.IntWritable;
2626
import org.apache.hadoop.io.Writable;
2727
import org.apache.hadoop.io.LongWritable;
28+
import org.apache.hadoop.ipc.Server.Connection;
2829
import org.apache.hadoop.util.StringUtils;
2930
import org.apache.hadoop.net.ConnectTimeoutException;
3031
import org.apache.hadoop.net.NetUtils;
@@ -520,11 +521,53 @@ public void testIpcConnectTimeout() throws Exception {
520521
}
521522
}
522523

524+
/**
525+
* Check service class byte in IPC header is correct on wire.
526+
*/
527+
@Test(timeout=60000)
528+
public void testIpcWithServiceClass() throws Exception {
529+
// start server
530+
Server server = new TestServer(5, false);
531+
InetSocketAddress addr = NetUtils.getConnectAddress(server);
532+
server.start();
533+
534+
// start client
535+
Client.setConnectTimeout(conf, 10000);
536+
537+
callAndVerify(server, addr, 0, true);
538+
// Service Class is low to -128 as byte on wire.
539+
// -128 shouldn't be casted on wire but -129 should.
540+
callAndVerify(server, addr, -128, true);
541+
callAndVerify(server, addr, -129, false);
542+
543+
// Service Class is up to 127.
544+
// 127 shouldn't be casted on wire but 128 should.
545+
callAndVerify(server, addr, 127, true);
546+
callAndVerify(server, addr, 128, false);
547+
548+
server.stop();
549+
}
550+
551+
/**
552+
* Make a call from a client and verify if header info is changed in server side
553+
*/
554+
private void callAndVerify(Server server, InetSocketAddress addr,
555+
int serviceClass, boolean noChanged) throws Exception{
556+
Client client = new Client(LongWritable.class, conf);
557+
558+
client.call(new LongWritable(RANDOM.nextLong()),
559+
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
560+
Connection connection = server.getConnections().get(0);
561+
int serviceClass2 = connection.getServiceClass();
562+
assertFalse(noChanged ^ serviceClass == serviceClass2);
563+
client.stop();
564+
}
565+
523566
/**
524567
* Check that file descriptors aren't leaked by starting
525568
* and stopping IPC servers.
526569
*/
527-
@Test
570+
@Test(timeout=60000)
528571
public void testSocketLeak() throws Exception {
529572
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
530573

0 commit comments

Comments
 (0)