Skip to content

Commit d4d1db5

Browse files
author
Suresh Srinivas
committed
HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding retry cache in case of HA failover. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508332 13f79535-47bb-0310-9956-ffa450edef68
1 parent c193668 commit d4d1db5

File tree

31 files changed

+1207
-419
lines changed

31 files changed

+1207
-419
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
30+
import org.apache.hadoop.classification.InterfaceAudience;
3031
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
3132
import org.apache.hadoop.ipc.Client;
3233
import org.apache.hadoop.ipc.Client.ConnectionId;
@@ -38,7 +39,12 @@
3839

3940
import com.google.common.annotations.VisibleForTesting;
4041

41-
class RetryInvocationHandler implements RpcInvocationHandler {
42+
/**
43+
* This class implements RpcInvocationHandler and supports retry on the client
44+
* side.
45+
*/
46+
@InterfaceAudience.Private
47+
public class RetryInvocationHandler implements RpcInvocationHandler {
4248
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
4349
private final FailoverProxyProvider proxyProvider;
4450

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,7 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
11611161
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
11621162
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
11631163
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
1164-
this.clientId = StringUtils.getUuidBytes();
1164+
this.clientId = ClientId.getClientId();
11651165
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
11661166
}
11671167

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.ipc;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.UUID;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
25+
import com.google.common.base.Preconditions;
26+
27+
/**
28+
* A class defining a set of static helper methods to provide conversion between
29+
* bytes and string for UUID-based client Id.
30+
*/
31+
@InterfaceAudience.Private
32+
public class ClientId {
33+
34+
/** The byte array of a UUID should be 16 */
35+
public static final int BYTE_LENGTH = 16;
36+
37+
/**
38+
* Return clientId as byte[]
39+
*/
40+
public static byte[] getClientId() {
41+
UUID uuid = UUID.randomUUID();
42+
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
43+
buf.putLong(uuid.getMostSignificantBits());
44+
buf.putLong(uuid.getLeastSignificantBits());
45+
return buf.array();
46+
}
47+
48+
/** Convert a clientId byte[] to string */
49+
public static String toString(byte[] clientId) {
50+
// clientId can be null or an empty array
51+
if (clientId == null || clientId.length == 0) {
52+
return "";
53+
}
54+
// otherwise should be 16 bytes
55+
Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
56+
long msb = 0;
57+
long lsb = 0;
58+
for (int i = 0; i < 8; i++) {
59+
msb = (msb << 8) | (clientId[i] & 0xff);
60+
}
61+
for (int i = 8; i < 16; i++) {
62+
lsb = (lsb << 8) | (clientId[i] & 0xff);
63+
}
64+
return (new UUID(msb, lsb)).toString();
65+
}
66+
67+
/** Convert from clientId string byte[] representation of clientId */
68+
public static byte[] toBytes(String id) {
69+
if (id == null || "".equals(id)) {
70+
return new byte[0];
71+
}
72+
UUID uuid = UUID.fromString(id);
73+
ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
74+
buf.putLong(uuid.getMostSignificantBits());
75+
buf.putLong(uuid.getLeastSignificantBits());
76+
return buf.array();
77+
}
78+
79+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import java.util.Arrays;
22+
import java.util.UUID;
2223

2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
@@ -27,6 +28,7 @@
2728
import org.apache.hadoop.util.LightWeightGSet;
2829
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
2930

31+
import com.google.common.annotations.VisibleForTesting;
3032
import com.google.common.base.Preconditions;
3133

3234
/**
@@ -64,8 +66,9 @@ public static class CacheEntry implements LightWeightCache.Entry {
6466

6567
CacheEntry(byte[] clientId, int callId, long expirationTime) {
6668
// ClientId must be a UUID - that is 16 octets.
67-
Preconditions.checkArgument(clientId.length == 16,
68-
"Invalid clientId - must be UUID of size 16 octets");
69+
Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
70+
"Invalid clientId - length is " + clientId.length
71+
+ " expected length " + ClientId.BYTE_LENGTH);
6972
// Convert UUID bytes to two longs
7073
long tmp = 0;
7174
for (int i=0; i<8; i++) {
@@ -131,6 +134,12 @@ public void setExpirationTime(long timeNano) {
131134
public long getExpirationTime() {
132135
return expirationTime;
133136
}
137+
138+
@Override
139+
public String toString() {
140+
return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
141+
+ this.callId + ":" + this.state;
142+
}
134143
}
135144

136145
/**
@@ -186,6 +195,11 @@ private static boolean skipRetryCache() {
186195
return !Server.isRpcInvocation() || Server.getCallId() < 0
187196
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
188197
}
198+
199+
@VisibleForTesting
200+
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
201+
return set;
202+
}
189203

190204
/**
191205
* This method handles the following conditions:
@@ -240,6 +254,26 @@ private CacheEntry waitForCompletion(CacheEntry newEntry) {
240254
}
241255
return mapEntry;
242256
}
257+
258+
/**
259+
* Add a new cache entry into the retry cache. The cache entry consists of
260+
* clientId and callId extracted from editlog.
261+
*/
262+
public void addCacheEntry(byte[] clientId, int callId) {
263+
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
264+
+ expirationTime);
265+
newEntry.completed(true);
266+
set.put(newEntry);
267+
}
268+
269+
public void addCacheEntryWithPayload(byte[] clientId, int callId,
270+
Object payload) {
271+
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
272+
System.nanoTime() + expirationTime);
273+
// since the entry is loaded from editlog, we can assume it succeeded.
274+
newEntry.completed(true);
275+
set.put(newEntry);
276+
}
243277

244278
private static CacheEntry newEntry(long expirationTime) {
245279
return new CacheEntry(Server.getClientId(), Server.getCallId(),

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.StringWriter;
2323
import java.net.URI;
2424
import java.net.URISyntaxException;
25-
import java.nio.ByteBuffer;
2625
import java.text.DateFormat;
2726
import java.util.ArrayList;
2827
import java.util.Arrays;
@@ -33,7 +32,6 @@
3332
import java.util.Locale;
3433
import java.util.Map;
3534
import java.util.StringTokenizer;
36-
import java.util.UUID;
3735
import java.util.regex.Matcher;
3836
import java.util.regex.Pattern;
3937

@@ -42,7 +40,6 @@
4240
import org.apache.hadoop.classification.InterfaceStability;
4341
import org.apache.hadoop.fs.Path;
4442
import org.apache.hadoop.net.NetUtils;
45-
import org.apache.hadoop.util.Shell;
4643

4744
import com.google.common.net.InetAddresses;
4845

@@ -897,17 +894,6 @@ public static String replaceTokens(String template, Pattern pattern,
897894
return sb.toString();
898895
}
899896

900-
/**
901-
* Return a new UUID as byte[]
902-
*/
903-
public static byte[] getUuidBytes() {
904-
UUID uuid = UUID.randomUUID();
905-
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
906-
buf.putLong(uuid.getMostSignificantBits());
907-
buf.putLong(uuid.getLeastSignificantBits());
908-
return buf.array();
909-
}
910-
911897
/**
912898
* Get stack trace for a given thread.
913899
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929

3030
import org.apache.hadoop.ipc.RPC.RpcKind;
3131
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
32-
import org.apache.hadoop.ipc.Server;
33-
import org.apache.hadoop.util.StringUtils;
3432
import org.junit.Assert;
3533
import org.junit.Before;
3634
import org.junit.Test;
@@ -39,7 +37,7 @@
3937
* Tests for {@link RetryCache}
4038
*/
4139
public class TestRetryCache {
42-
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
40+
private static final byte[] CLIENT_ID = ClientId.getClientId();
4341
private static int callId = 100;
4442
private static final Random r = new Random();
4543
private static final TestServer testServer = new TestServer();

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717
*/
1818
package org.apache.hadoop.util;
1919

20-
import static org.junit.Assert.assertTrue;
2120
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
2222

2323
import java.io.ByteArrayInputStream;
2424
import java.io.ByteArrayOutputStream;
2525
import java.io.DataInputStream;
2626
import java.io.IOException;
2727
import java.util.Arrays;
2828

29+
import org.apache.hadoop.ipc.ClientId;
2930
import org.apache.hadoop.ipc.RPC.RpcKind;
3031
import org.apache.hadoop.ipc.RpcConstants;
3132
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -78,7 +79,7 @@ private void doVarIntTest(int value) throws IOException {
7879

7980
@Test
8081
public void testRpcClientId() {
81-
byte[] uuid = StringUtils.getUuidBytes();
82+
byte[] uuid = ClientId.getClientId();
8283
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
8384
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
8485
RpcConstants.INVALID_RETRY_COUNT, uuid);

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,9 @@ Release 2.1.0-beta - 2013-07-02
343343
protocol methods. (suresh)
344344

345345
HDFS-4979. Implement retry cache on Namenode. (suresh)
346+
347+
HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding
348+
retry cache in case of HA failover. (Jing Zhao via suresh)
346349

347350
IMPROVEMENTS
348351

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,8 @@ public DFSClient(URI nameNodeUri, Configuration conf,
453453
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
454454
* Exactly one of nameNodeUri or rpcNamenode must be null.
455455
*/
456-
DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
456+
@VisibleForTesting
457+
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
457458
Configuration conf, FileSystem.Statistics stats)
458459
throws IOException {
459460
// Copy only the required DFSClient configuration

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
6969
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
7070

71+
import com.google.common.annotations.VisibleForTesting;
7172
import com.google.common.base.Preconditions;
7273

7374
/**
@@ -307,7 +308,8 @@ private static Object createNameNodeProxy(InetSocketAddress address,
307308
}
308309

309310
/** Gets the configured Failover proxy provider's class */
310-
private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
311+
@VisibleForTesting
312+
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
311313
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
312314
if (nameNodeUri == null) {
313315
return null;
@@ -344,7 +346,8 @@ private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass
344346
}
345347

346348
/** Creates the Failover proxy provider instance*/
347-
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
349+
@VisibleForTesting
350+
public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
348351
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
349352
Class<T> xface, URI nameNodeUri) throws IOException {
350353
Preconditions.checkArgument(

0 commit comments

Comments
 (0)