Skip to content

Commit 15fcad3

Browse files
committed
Merge remote-tracking branch 'remotes/origin/trunk' into trunk
2 parents ddeb3bb + 5b6bae0 commit 15fcad3

File tree

16 files changed

+825
-40
lines changed

16 files changed

+825
-40
lines changed

hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
8080
<exclude>src/main/native/*</exclude>
8181
<exclude>src/main/native/config/*</exclude>
8282
<exclude>src/main/native/m4/*</exclude>
83-
<exclude>src/main/native/util/tree.h</exclude>
83+
<exclude>src/main/native/fuse-dfs/util/tree.h</exclude>
8484
<exclude>src/contrib/**</exclude>
8585
</excludes>
8686
</configuration>

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
306306
public static final String DFS_NAMENODE_MAX_XATTR_SIZE_KEY = "dfs.namenode.fs-limits.max-xattr-size";
307307
public static final int DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384;
308308

309+
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
310+
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
309311

310312
//Following keys have no defaults
311313
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ DatanodeStorageInfo[] chooseTarget(String src,
142142
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
143143

144144
// Choose favored nodes
145-
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
145+
List<DatanodeStorageInfo> results = new ArrayList<>();
146146
boolean avoidStaleNodes = stats != null
147147
&& stats.isAvoidingStaleDataNodesForWrite();
148148
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
@@ -194,14 +194,14 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
194194
}
195195

196196
if (excludedNodes == null) {
197-
excludedNodes = new HashSet<Node>();
197+
excludedNodes = new HashSet<>();
198198
}
199199

200200
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
201201
numOfReplicas = result[0];
202202
int maxNodesPerRack = result[1];
203203

204-
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
204+
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
205205
for (DatanodeStorageInfo storage : chosenStorage) {
206206
// add localMachine and related nodes to excludedNodes
207207
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
@@ -765,7 +765,7 @@ protected Collection<DatanodeDescriptor> pickupReplicaSet(
765765
Collection<DatanodeDescriptor> second) {
766766
return first.isEmpty() ? second : first;
767767
}
768-
768+
769769
@VisibleForTesting
770770
void setPreferLocalNode(boolean prefer) {
771771
this.preferLocalNode = prefer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.blockmanagement;
19+
20+
import org.apache.hadoop.classification.InterfaceAudience;
21+
import org.apache.hadoop.classification.InterfaceStability;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
31+
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.hdfs.DFSConfigKeys;
34+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
35+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
36+
import org.apache.hadoop.net.NetworkTopology;
37+
38+
/**
39+
* The class is responsible for choosing the desired number of targets
40+
* for placing block replicas that honors upgrade domain policy.
41+
* Here is the replica placement strategy. If the writer is on a datanode,
42+
* the 1st replica is placed on the local machine,
43+
* otherwise a random datanode. The 2nd replica is placed on a datanode
44+
* that is on a different rack. The 3rd replica is placed on a datanode
45+
* which is on a different node of the rack as the second replica.
46+
* All 3 replicas have unique upgrade domains.
47+
*/
48+
@InterfaceAudience.Private
49+
@InterfaceStability.Evolving
50+
public class BlockPlacementPolicyWithUpgradeDomain extends
51+
BlockPlacementPolicyDefault {
52+
53+
private int upgradeDomainFactor;
54+
55+
@Override
56+
public void initialize(Configuration conf, FSClusterStats stats,
57+
NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
58+
super.initialize(conf, stats, clusterMap, host2datanodeMap);
59+
upgradeDomainFactor = conf.getInt(
60+
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR,
61+
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT);
62+
}
63+
64+
@Override
65+
protected boolean isGoodDatanode(DatanodeDescriptor node,
66+
int maxTargetPerRack, boolean considerLoad,
67+
List<DatanodeStorageInfo> results, boolean avoidStaleNodes) {
68+
boolean isGoodTarget = super.isGoodDatanode(node,
69+
maxTargetPerRack, considerLoad, results, avoidStaleNodes);
70+
if (isGoodTarget) {
71+
if (results.size() > 0 && results.size() < upgradeDomainFactor) {
72+
// Each node in "results" has a different upgrade domain. Make sure
73+
// the candidate node introduces a new upgrade domain.
74+
Set<String> upgradeDomains = getUpgradeDomains(results);
75+
if (upgradeDomains.contains(node.getUpgradeDomain())) {
76+
isGoodTarget = false;
77+
}
78+
}
79+
}
80+
return isGoodTarget;
81+
}
82+
83+
// If upgrade domain isn't specified, uses its XferAddr as upgrade domain.
84+
// Such fallback is useful to test the scenario where upgrade domain isn't
85+
// defined but the block placement is set to upgrade domain policy.
86+
public String getUpgradeDomainWithDefaultValue(DatanodeInfo datanodeInfo) {
87+
String upgradeDomain = datanodeInfo.getUpgradeDomain();
88+
if (upgradeDomain == null) {
89+
LOG.warn("Upgrade domain isn't defined for " + datanodeInfo);
90+
upgradeDomain = datanodeInfo.getXferAddr();
91+
}
92+
return upgradeDomain;
93+
}
94+
95+
private String getUpgradeDomain(DatanodeStorageInfo storage) {
96+
return getUpgradeDomainWithDefaultValue(storage.getDatanodeDescriptor());
97+
}
98+
99+
private Set<String> getUpgradeDomains(List<DatanodeStorageInfo> results) {
100+
Set<String> upgradeDomains = new HashSet<>();
101+
if (results == null) {
102+
return upgradeDomains;
103+
}
104+
for(DatanodeStorageInfo storageInfo : results) {
105+
upgradeDomains.add(getUpgradeDomain(storageInfo));
106+
}
107+
return upgradeDomains;
108+
}
109+
110+
private Set<String> getUpgradeDomainsFromNodes(DatanodeInfo[] nodes) {
111+
Set<String> upgradeDomains = new HashSet<>();
112+
if (nodes == null) {
113+
return upgradeDomains;
114+
}
115+
for(DatanodeInfo node : nodes) {
116+
upgradeDomains.add(getUpgradeDomainWithDefaultValue(node));
117+
}
118+
return upgradeDomains;
119+
}
120+
121+
private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
122+
DatanodeStorageInfo[] storageInfos) {
123+
Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
124+
for(DatanodeStorageInfo storage : storageInfos) {
125+
String upgradeDomain = getUpgradeDomainWithDefaultValue(
126+
storage.getDatanodeDescriptor());
127+
List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
128+
if (storages == null) {
129+
storages = new ArrayList<>();
130+
upgradeDomainMap.put(upgradeDomain, storages);
131+
}
132+
storages.add(storage);
133+
}
134+
return upgradeDomainMap;
135+
}
136+
137+
@Override
138+
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
139+
LocatedBlock lBlk, int numberOfReplicas) {
140+
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath,
141+
lBlk, numberOfReplicas);
142+
BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
143+
new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
144+
getUpgradeDomainsFromNodes(lBlk.getLocations()),
145+
numberOfReplicas, upgradeDomainFactor);
146+
return upgradeDomainStatus;
147+
}
148+
149+
private <T> List<T> getShareUDSet(
150+
Map<String, List<T>> upgradeDomains) {
151+
List<T> getShareUDSet = new ArrayList<>();
152+
for (Map.Entry<String, List<T>> e : upgradeDomains.entrySet()) {
153+
if (e.getValue().size() > 1) {
154+
getShareUDSet.addAll(e.getValue());
155+
}
156+
}
157+
return getShareUDSet;
158+
}
159+
160+
/*
161+
* The policy to pick the replica set for deleting the over-replicated
162+
* replica which meet the rack and upgrade domain requirements.
163+
* The algorithm:
164+
* a. Each replica has a boolean attribute "shareRack" that defines
165+
* whether it shares its rack with another replica of the same block.
166+
* b. Each replica has another boolean attribute "shareUD" that defines
167+
* whether it shares its upgrade domain with another replica of the same
168+
* block.
169+
* c. Partition the replicas into 4 sets (some might be empty.):
170+
* shareRackAndUDSet: {shareRack==true, shareUD==true}
171+
* shareUDNotRackSet: {shareRack==false, shareUD==true}
172+
* shareRackNotUDSet: {shareRack==true, shareUD==false}
173+
* NoShareRackOrUDSet: {shareRack==false, shareUD==false}
174+
* d. Pick the first not-empty replica set in the following order.
175+
* shareRackAndUDSet, shareUDNotRackSet, shareRackNotUDSet,
176+
* NoShareRackOrUDSet
177+
* e. Proof this won't degrade the existing rack-based data
178+
* availability model under different scenarios.
179+
* 1. shareRackAndUDSet isn't empty. Removing a node
180+
* from shareRackAndUDSet won't change # of racks and # of UD.
181+
* The followings cover empty shareUDNotRackSet scenarios.
182+
* 2. shareUDNotRackSet isn't empty and shareRackNotUDSet isn't empty.
183+
* Let us proof that # of racks >= 3 before the deletion and thus
184+
* after deletion # of racks >= 2.
185+
* Given shareUDNotRackSet is empty, there won't be overlap between
186+
* shareUDNotRackSet and shareRackNotUDSet. It means DNs in
187+
* shareRackNotUDSet should be on at least a rack
188+
* different from any DN' rack in shareUDNotRackSet.
189+
* Given shareUDNotRackSet.size() >= 2 and each DN in the set
190+
* doesn't share rack with any other DNs, there are at least 2 racks
191+
* coming from shareUDNotRackSet.
192+
* Thus the # of racks from DNs in {shareUDNotRackSet,
193+
* shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet
194+
* will reduce the # of racks by 1 and won't change # of upgrade
195+
* domains.
196+
* Note that this is different from BlockPlacementPolicyDefault which
197+
* will keep the # of racks after deletion. With upgrade domain policy,
198+
* given # of racks is still >= 2 after deletion, the data availability
199+
* model remains the same as BlockPlacementPolicyDefault (only supports
200+
* one rack failure).
201+
* For example, assume we have 4 replicas: d1(rack1, ud1),
202+
* d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have
203+
* shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}.
204+
* With upgrade domain policy, the remaining replicas after deletion
205+
* are {d1(or d2), d3, d4} which has 2 racks.
206+
* With BlockPlacementPolicyDefault policy, the remaining replicas
207+
* after deletion are {d1, d2, d3(or d4)} which has 3 racks.
208+
* 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This
209+
* implies all replicas are on unique racks. Removing a node from
210+
* shareUDNotRackSet will reduce # of racks (no different from
211+
* BlockPlacementPolicyDefault) by 1 and won't change #
212+
* of upgrade domains.
213+
* 4. shareUDNotRackSet is empty and shareRackNotUDSet isn't empty.
214+
* Removing a node from shareRackNotUDSet is no different from
215+
* BlockPlacementPolicyDefault.
216+
* 5. shareUDNotRackSet is empty and shareRackNotUDSet is empty.
217+
* Removing a node from NoShareRackOrUDSet is no different from
218+
* BlockPlacementPolicyDefault.
219+
* The implementation:
220+
* 1. Generate set shareUDSet which includes all DatanodeStorageInfo that
221+
* share the same upgrade domain with another DatanodeStorageInfo,
222+
* e.g. {shareRackAndUDSet, shareUDNotRackSet}.
223+
* 2. If shareUDSet is empty, it means shareRackAndUDSet is empty and
224+
* shareUDNotRackSet is empty. Use the default rack based policy.
225+
* 3. If shareUDSet isn't empty, intersect it with moreThanOne(
226+
* {shareRackAndUDSet, shareRackNotUDSet})to generate shareRackAndUDSet.
227+
* 4. If shareRackAndUDSet isn't empty, return
228+
* shareRackAndUDSet, otherwise return shareUDSet which is the same as
229+
* shareUDNotRackSet.
230+
*/
231+
@Override
232+
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
233+
Collection<DatanodeStorageInfo> moreThanOne,
234+
Collection<DatanodeStorageInfo> exactlyOne) {
235+
List<DatanodeStorageInfo> all = new ArrayList<>();
236+
if (moreThanOne != null) {
237+
all.addAll(moreThanOne);
238+
}
239+
if (exactlyOne != null) {
240+
all.addAll(exactlyOne);
241+
}
242+
243+
Map<String, List<DatanodeStorageInfo>> upgradeDomains =
244+
getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
245+
246+
// shareUDSet includes DatanodeStorageInfo that share same upgrade
247+
// domain with another DatanodeStorageInfo.
248+
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
249+
// shareRackAndUDSet contains those DatanodeStorageInfo that
250+
// share rack and upgrade domain with another DatanodeStorageInfo.
251+
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
252+
if (shareUDSet.size() == 0) {
253+
// All upgrade domains are unique, use the parent set.
254+
return super.pickupReplicaSet(moreThanOne, exactlyOne);
255+
} else if (moreThanOne != null) {
256+
for (DatanodeStorageInfo storage : shareUDSet) {
257+
if (moreThanOne.contains(storage)) {
258+
shareRackAndUDSet.add(storage);
259+
}
260+
}
261+
}
262+
return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
263+
}
264+
}

0 commit comments

Comments
 (0)