Skip to content

Commit 47d214e

Browse files
committed
HDFS-2167. Move dnsToSwitchMapping and hostsReader from FSNamesystem to DatanodeManager.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1149455 13f79535-47bb-0310-9956-ffa450edef68
1 parent d56da33 commit 47d214e

File tree

9 files changed

+617
-551
lines changed

9 files changed

+617
-551
lines changed

hdfs/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,9 @@ Trunk (unreleased changes)
583583
to DelegationTokenIdentifier. (szetszwo)
584584

585585
HDFS-1774. Small optimization to FSDataset. (Uma Maheswara Rao G via eli)
586+
587+
HDFS-2167. Move dnsToSwitchMapping and hostsReader from FSNamesystem to
588+
DatanodeManager. (szetszwo)
586589

587590
OPTIMIZATIONS
588591

hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.blockmanagement;
1919

20+
import static org.apache.hadoop.hdfs.server.common.Util.now;
21+
2022
import java.io.IOException;
2123
import java.io.PrintWriter;
2224
import java.util.ArrayList;
@@ -33,6 +35,7 @@
3335

3436
import org.apache.commons.logging.Log;
3537
import org.apache.commons.logging.LogFactory;
38+
import org.apache.hadoop.HadoopIllegalArgumentException;
3639
import org.apache.hadoop.classification.InterfaceAudience;
3740
import org.apache.hadoop.conf.Configuration;
3841
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -106,10 +109,8 @@ public long getExcessBlocksCount() {
106109

107110
private final DatanodeManager datanodeManager;
108111

109-
//
110-
// Store blocks-->datanodedescriptor(s) map of corrupt replicas
111-
//
112-
private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
112+
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
113+
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
113114

114115
//
115116
// Keeps a Collection for every named machine containing
@@ -136,34 +137,34 @@ public long getExcessBlocksCount() {
136137
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
137138
private final PendingReplicationBlocks pendingReplications;
138139

139-
// The maximum number of replicas allowed for a block
140+
/** The maximum number of replicas allowed for a block */
140141
public final int maxReplication;
141-
// How many outgoing replication streams a given node should have at one time
142+
/** The maximum number of outgoing replication streams
143+
* a given node should have at one time
144+
*/
142145
public int maxReplicationStreams;
143-
// Minimum copies needed or else write is disallowed
146+
/** Minimum copies needed or else write is disallowed */
144147
public final int minReplication;
145-
// Default number of replicas
148+
/** Default number of replicas */
146149
public final int defaultReplication;
147-
// How many entries are returned by getCorruptInodes()
150+
/** The maximum number of entries returned by getCorruptInodes() */
148151
final int maxCorruptFilesReturned;
149152

150-
// variable to enable check for enough racks
153+
/** variable to enable check for enough racks */
151154
final boolean shouldCheckForEnoughRacks;
152155

153-
/**
154-
* Last block index used for replication work.
155-
*/
156+
/** Last block index used for replication work. */
156157
private int replIndex = 0;
157158

158-
// for block replicas placement
159-
public final BlockPlacementPolicy replicator;
159+
/** for block replicas placement */
160+
private BlockPlacementPolicy blockplacement;
160161

161162
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
162163
namesystem = fsn;
163-
datanodeManager = new DatanodeManager(fsn);
164+
datanodeManager = new DatanodeManager(fsn, conf);
164165

165166
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
166-
replicator = BlockPlacementPolicy.getInstance(
167+
blockplacement = BlockPlacementPolicy.getInstance(
167168
conf, namesystem, datanodeManager.getNetworkTopology());
168169
pendingReplications = new PendingReplicationBlocks(conf.getInt(
169170
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -220,6 +221,19 @@ public DatanodeManager getDatanodeManager() {
220221
return datanodeManager;
221222
}
222223

224+
/** @return the BlockPlacementPolicy */
225+
public BlockPlacementPolicy getBlockPlacementPolicy() {
226+
return blockplacement;
227+
}
228+
229+
/** Set BlockPlacementPolicy */
230+
public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
231+
if (newpolicy == null) {
232+
throw new HadoopIllegalArgumentException("newpolicy == null");
233+
}
234+
this.blockplacement = newpolicy;
235+
}
236+
223237
public void metaSave(PrintWriter out) {
224238
//
225239
// Dump contents of neededReplication
@@ -551,7 +565,7 @@ public void removeDatanode(final DatanodeDescriptor node) {
551565
}
552566
}
553567

554-
void removeFromInvalidates(String storageID, Block block) {
568+
private void removeFromInvalidates(String storageID, Block block) {
555569
Collection<Block> v = recentInvalidateSets.get(storageID);
556570
if (v != null && v.remove(block)) {
557571
pendingDeletionBlocksCount--;
@@ -921,7 +935,7 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) {
921935
// It is costly to extract the filename for which chooseTargets is called,
922936
// so for now we pass in the Inode itself.
923937
DatanodeDescriptor targets[] =
924-
replicator.chooseTarget(fileINode, additionalReplRequired,
938+
blockplacement.chooseTarget(fileINode, additionalReplRequired,
925939
srcNode, containingNodes, block.getNumBytes());
926940
if(targets.length == 0)
927941
return false;
@@ -1021,7 +1035,7 @@ public DatanodeDescriptor[] chooseTarget(final String src,
10211035
final HashMap<Node, Node> excludedNodes,
10221036
final long blocksize) throws IOException {
10231037
// choose targets for the new block to be allocated.
1024-
final DatanodeDescriptor targets[] = replicator.chooseTarget(
1038+
final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
10251039
src, numOfReplicas, client, excludedNodes, blocksize);
10261040
if (targets.length < minReplication) {
10271041
throw new IOException("File " + src + " could only be replicated to " +
@@ -1240,7 +1254,7 @@ void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report)
12401254
}
12411255
}
12421256

1243-
void reportDiff(DatanodeDescriptor dn,
1257+
private void reportDiff(DatanodeDescriptor dn,
12441258
BlockListAsLongs newReport,
12451259
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
12461260
Collection<Block> toRemove, // remove from DatanodeDescriptor
@@ -1670,7 +1684,7 @@ public void processOverReplicatedBlock(Block block, short replication,
16701684
}
16711685
}
16721686
namesystem.chooseExcessReplicates(nonExcess, block, replication,
1673-
addedNode, delNodeHint, replicator);
1687+
addedNode, delNodeHint, blockplacement);
16741688
}
16751689

16761690
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1694,7 +1708,7 @@ public void addToExcessReplicate(DatanodeInfo dn, Block block) {
16941708
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
16951709
* removed block is still valid.
16961710
*/
1697-
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
1711+
private void removeStoredBlock(Block block, DatanodeDescriptor node) {
16981712
if(NameNode.stateChangeLog.isDebugEnabled()) {
16991713
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
17001714
+ block + " from " + node.getName());
@@ -1881,7 +1895,8 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
18811895
* On stopping decommission, check if the node has excess replicas.
18821896
* If there are any excess replicas, call processOverReplicatedBlock()
18831897
*/
1884-
public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) {
1898+
private void processOverReplicatedBlocksOnReCommission(
1899+
final DatanodeDescriptor srcNode) {
18851900
final Iterator<? extends Block> it = srcNode.getBlockIterator();
18861901
while(it.hasNext()) {
18871902
final Block block = it.next();
@@ -1900,7 +1915,7 @@ public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode
19001915
* Return true if there are any blocks on this node that have not
19011916
* yet reached their replication factor. Otherwise returns false.
19021917
*/
1903-
public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
1918+
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
19041919
boolean status = false;
19051920
int underReplicatedBlocks = 0;
19061921
int decommissionOnlyReplicas = 0;
@@ -2022,7 +2037,7 @@ private int getReplication(Block block) {
20222037
}
20232038

20242039
/** Remove a datanode from the invalidatesSet */
2025-
public void removeFromInvalidates(String storageID) {
2040+
private void removeFromInvalidates(String storageID) {
20262041
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
20272042
if (blocks != null) {
20282043
pendingDeletionBlocksCount -= blocks.size();
@@ -2086,28 +2101,6 @@ private int invalidateWorkForOneNode(String nodeId) {
20862101
namesystem.writeUnlock();
20872102
}
20882103
}
2089-
2090-
//Returns the number of racks over which a given block is replicated
2091-
//decommissioning/decommissioned nodes are not counted. corrupt replicas
2092-
//are also ignored
2093-
public int getNumberOfRacks(Block b) {
2094-
HashSet<String> rackSet = new HashSet<String>(0);
2095-
Collection<DatanodeDescriptor> corruptNodes =
2096-
corruptReplicas.getNodes(b);
2097-
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
2098-
it.hasNext();) {
2099-
DatanodeDescriptor cur = it.next();
2100-
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
2101-
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
2102-
String rackName = cur.getNetworkLocation();
2103-
if (!rackSet.contains(rackName)) {
2104-
rackSet.add(rackName);
2105-
}
2106-
}
2107-
}
2108-
}
2109-
return rackSet.size();
2110-
}
21112104

21122105
boolean blockHasEnoughRacks(Block b) {
21132106
if (!this.shouldCheckForEnoughRacks) {
@@ -2209,4 +2202,50 @@ public BlockIterator getCorruptReplicaBlockIterator() {
22092202
return neededReplications
22102203
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
22112204
}
2205+
2206+
/**
2207+
* Change, if appropriate, the admin state of a datanode to
2208+
* decommission completed. Return true if decommission is complete.
2209+
*/
2210+
boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
2211+
// Check to see if all blocks in this decommissioned
2212+
// node has reached their target replication factor.
2213+
if (node.isDecommissionInProgress()) {
2214+
if (!isReplicationInProgress(node)) {
2215+
node.setDecommissioned();
2216+
LOG.info("Decommission complete for node " + node.getName());
2217+
}
2218+
}
2219+
return node.isDecommissioned();
2220+
}
2221+
2222+
/** Start decommissioning the specified datanode. */
2223+
void startDecommission(DatanodeDescriptor node) throws IOException {
2224+
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
2225+
LOG.info("Start Decommissioning node " + node.getName() + " with " +
2226+
node.numBlocks() + " blocks.");
2227+
synchronized (namesystem.heartbeats) {
2228+
namesystem.updateStats(node, false);
2229+
node.startDecommission();
2230+
namesystem.updateStats(node, true);
2231+
}
2232+
node.decommissioningStatus.setStartTime(now());
2233+
2234+
// all the blocks that reside on this node have to be replicated.
2235+
checkDecommissionStateInternal(node);
2236+
}
2237+
}
2238+
2239+
/** Stop decommissioning the specified datanodes. */
2240+
void stopDecommission(DatanodeDescriptor node) throws IOException {
2241+
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
2242+
LOG.info("Stop Decommissioning node " + node.getName());
2243+
synchronized (namesystem.heartbeats) {
2244+
namesystem.updateStats(node, false);
2245+
node.stopDecommission();
2246+
namesystem.updateStats(node, true);
2247+
}
2248+
processOverReplicatedBlocksOnReCommission(node);
2249+
}
2250+
}
22122251
}

0 commit comments

Comments
 (0)