2626import org .apache .hadoop .hdfs .protocol .Block ;
2727import org .apache .hadoop .hdfs .server .namenode .NameNode ;
2828
29- /** Keep track of under replication blocks.
30- * Blocks have replication priority, with priority 0 indicating the highest
31- * Blocks have only one replicas has the highest
29+ /**
30+ * Keep prioritized queues of under replicated blocks.
31+ * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
32+ * indicating the highest priority.
33+ * </p>
34+ * Having a prioritised queue allows the {@link BlockManager} to select
35+ * which blocks to replicate first -it tries to give priority to data
36+ * that is most at risk or considered most valuable.
37+ *
38+ * <p/>
39+ * The policy for choosing which priority to give added blocks
40+ * is implemented in {@link #getPriority(Block, int, int, int)}.
41+ * </p>
42+ * <p>The queue order is as follows:</p>
43+ * <ol>
44+ * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
45+ * first. That is blocks with only one copy, or blocks with zero live
46+ * copies but a copy in a node being decommissioned. These blocks
47+ * are at risk of loss if the disk or server on which they
48+ * remain fails.</li>
49+ * <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
50+ * under-replicated compared to their expected values. Currently
51+ * that means the ratio of the ratio of actual:expected means that
52+ * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
53+ * but they are clearly considered "important".
54+ * <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
55+ * replicated, and the ratio of actual:expected is good enough that
56+ * they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
57+ * queue.</li>
58+ * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
59+ * many copies of a block as required, but the blocks are not adequately
60+ * distributed. Loss of a rack/switch could take all copies off-line.</li>
61+ * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
62+ * and for which there are no-non-corrupt copies (currently) available.
63+ * The policy here is to keep those corrupt blocks replicated, but give
64+ * blocks that are not corrupt higher priority.</li>
65+ * </ol>
3266 */
3367class UnderReplicatedBlocks implements Iterable <Block > {
68+ /** The total number of queues : {@value} */
3469 static final int LEVEL = 5 ;
70+ /** The queue with the highest priority: {@value} */
71+ static final int QUEUE_HIGHEST_PRIORITY = 0 ;
72+ /** The queue for blocks that are way below their expected value : {@value} */
73+ static final int QUEUE_VERY_UNDER_REPLICATED = 1 ;
74+ /** The queue for "normally" under-replicated blocks: {@value} */
75+ static final int QUEUE_UNDER_REPLICATED = 2 ;
76+ /** The queue for blocks that have the right number of replicas,
77+ * but which the block manager felt were badly distributed: {@value}
78+ */
79+ static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3 ;
80+ /** The queue for corrupt blocks: {@value} */
3581 static final int QUEUE_WITH_CORRUPT_BLOCKS = 4 ;
82+ /** the queues themselves */
3683 private final List <NavigableSet <Block >> priorityQueues
37- = new ArrayList <NavigableSet <Block >>();
38-
84+ = new ArrayList <NavigableSet <Block >>(LEVEL );
85+
3986 /** Create an object. */
4087 UnderReplicatedBlocks () {
41- for (int i = 0 ; i < LEVEL ; i ++) {
88+ for (int i = 0 ; i < LEVEL ; i ++) {
4289 priorityQueues .add (new TreeSet <Block >());
4390 }
4491 }
@@ -47,15 +94,15 @@ class UnderReplicatedBlocks implements Iterable<Block> {
4794 * Empty the queues.
4895 */
4996 void clear () {
50- for (int i = 0 ; i < LEVEL ; i ++) {
97+ for (int i = 0 ; i < LEVEL ; i ++) {
5198 priorityQueues .get (i ).clear ();
5299 }
53100 }
54101
55102 /** Return the total number of under replication blocks */
56103 synchronized int size () {
57104 int size = 0 ;
58- for (int i = 0 ; i < LEVEL ; i ++) {
105+ for (int i = 0 ; i < LEVEL ; i ++) {
59106 size += priorityQueues .get (i ).size ();
60107 }
61108 return size ;
@@ -64,60 +111,73 @@ synchronized int size() {
64111 /** Return the number of under replication blocks excluding corrupt blocks */
65112 synchronized int getUnderReplicatedBlockCount () {
66113 int size = 0 ;
67- for (int i =0 ; i <QUEUE_WITH_CORRUPT_BLOCKS ; i ++) {
68- size += priorityQueues .get (i ).size ();
114+ for (int i = 0 ; i < LEVEL ; i ++) {
115+ if (i != QUEUE_WITH_CORRUPT_BLOCKS ) {
116+ size += priorityQueues .get (i ).size ();
117+ }
69118 }
70119 return size ;
71120 }
72-
121+
73122 /** Return the number of corrupt blocks */
74123 synchronized int getCorruptBlockSize () {
75124 return priorityQueues .get (QUEUE_WITH_CORRUPT_BLOCKS ).size ();
76125 }
77126
78127 /** Check if a block is in the neededReplication queue */
79128 synchronized boolean contains (Block block ) {
80- for (NavigableSet <Block > set : priorityQueues ) {
81- if (set .contains (block )) { return true ; }
129+ for (NavigableSet <Block > set : priorityQueues ) {
130+ if (set .contains (block )) {
131+ return true ;
132+ }
82133 }
83134 return false ;
84135 }
85-
136+
86137 /** Return the priority of a block
87- * @param block a under replication block
138+ * @param block a under replicated block
88139 * @param curReplicas current number of replicas of the block
89140 * @param expectedReplicas expected number of replicas of the block
141+ * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
90142 */
91- private int getPriority (Block block ,
143+ private int getPriority (Block block ,
92144 int curReplicas ,
93145 int decommissionedReplicas ,
94146 int expectedReplicas ) {
95147 assert curReplicas >= 0 : "Negative replicas!" ;
96148 if (curReplicas >= expectedReplicas ) {
97- return 3 ; // Block doesn't have enough racks
98- } else if (curReplicas ==0 ) {
99- // If there are zero non-decommissioned replica but there are
149+ // Block has enough copies, but not enough racks
150+ return QUEUE_REPLICAS_BADLY_DISTRIBUTED ;
151+ } else if (curReplicas == 0 ) {
152+ // If there are zero non-decommissioned replicas but there are
100153 // some decommissioned replicas, then assign them highest priority
101154 if (decommissionedReplicas > 0 ) {
102- return 0 ;
155+ return QUEUE_HIGHEST_PRIORITY ;
103156 }
104- return QUEUE_WITH_CORRUPT_BLOCKS ; // keep these blocks in needed replication.
105- } else if (curReplicas ==1 ) {
106- return 0 ; // highest priority
107- } else if (curReplicas *3 <expectedReplicas ) {
108- return 1 ;
157+ //all we have are corrupt blocks
158+ return QUEUE_WITH_CORRUPT_BLOCKS ;
159+ } else if (curReplicas == 1 ) {
160+ //only on replica -risk of loss
161+ // highest priority
162+ return QUEUE_HIGHEST_PRIORITY ;
163+ } else if ((curReplicas * 3 ) < expectedReplicas ) {
164+ //there is less than a third as many blocks as requested;
165+ //this is considered very under-replicated
166+ return QUEUE_VERY_UNDER_REPLICATED ;
109167 } else {
110- return 2 ;
168+ //add to the normal queue for under replicated blocks
169+ return QUEUE_UNDER_REPLICATED ;
111170 }
112171 }
113-
172+
114173 /** add a block to a under replication queue according to its priority
115174 * @param block a under replication block
116175 * @param curReplicas current number of replicas of the block
176+ * @param decomissionedReplicas the number of decommissioned replicas
117177 * @param expectedReplicas expected number of replicas of the block
178+ * @return true if the block was added to a queue.
118179 */
119- synchronized boolean add (
120- Block block ,
180+ synchronized boolean add (Block block ,
121181 int curReplicas ,
122182 int decomissionedReplicas ,
123183 int expectedReplicas ) {
@@ -129,7 +189,7 @@ synchronized boolean add(
129189 NameNode .stateChangeLog .debug (
130190 "BLOCK* NameSystem.UnderReplicationBlock.add:"
131191 + block
132- + " has only " + curReplicas
192+ + " has only " + curReplicas
133193 + " replicas and need " + expectedReplicas
134194 + " replicas so is added to neededReplications"
135195 + " at priority level " + priLevel );
@@ -149,8 +209,22 @@ synchronized boolean remove(Block block,
149209 oldExpectedReplicas );
150210 return remove (block , priLevel );
151211 }
152-
153- /** remove a block from a under replication queue given a priority*/
212+
213+ /**
214+ * Remove a block from the under replication queues.
215+ *
216+ * The priLevel parameter is a hint of which queue to query
217+ * first: if negative or >= {@link #LEVEL} this shortcutting
218+ * is not attmpted.
219+ *
220+ * If the block is not found in the nominated queue, an attempt is made to
221+ * remove it from all queues.
222+ *
223+ * <i>Warning:</i> This is not a synchronized method.
224+ * @param block block to remove
225+ * @param priLevel expected privilege level
226+ * @return true if the block was found and removed from one of the priority queues
227+ */
154228 boolean remove (Block block , int priLevel ) {
155229 if (priLevel >= 0 && priLevel < LEVEL
156230 && priorityQueues .get (priLevel ).remove (block )) {
@@ -164,8 +238,8 @@ boolean remove(Block block, int priLevel) {
164238 } else {
165239 // Try to remove the block from all queues if the block was
166240 // not found in the queue for the given priority level.
167- for (int i = 0 ; i < LEVEL ; i ++) {
168- if (priorityQueues .get (i ).remove (block )) {
241+ for (int i = 0 ; i < LEVEL ; i ++) {
242+ if (priorityQueues .get (i ).remove (block )) {
169243 if (NameNode .stateChangeLog .isDebugEnabled ()) {
170244 NameNode .stateChangeLog .debug (
171245 "BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -178,9 +252,24 @@ boolean remove(Block block, int priLevel) {
178252 }
179253 return false ;
180254 }
181-
182- /** update the priority level of a block */
183- synchronized void update (Block block , int curReplicas ,
255+
256+ /**
257+ * Recalculate and potentially update the priority level of a block.
258+ *
259+ * If the block priority has changed from before an attempt is made to
260+ * remove it from the block queue. Regardless of whether or not the block
261+ * is in the block queue of (recalculate) priority, an attempt is made
262+ * to add it to that queue. This ensures that the block will be
263+ * in its expected priority queue (and only that queue) by the end of the
264+ * method call.
265+ * @param block a under replicated block
266+ * @param curReplicas current number of replicas of the block
267+ * @param decommissionedReplicas the number of decommissioned replicas
268+ * @param curExpectedReplicas expected number of replicas of the block
269+ * @param curReplicasDelta the change in the replicate count from before
270+ * @param expectedReplicasDelta the change in the expected replica count from before
271+ */
272+ synchronized void update (Block block , int curReplicas ,
184273 int decommissionedReplicas ,
185274 int curExpectedReplicas ,
186275 int curReplicasDelta , int expectedReplicasDelta ) {
@@ -206,7 +295,7 @@ synchronized void update(Block block, int curReplicas,
206295 NameNode .stateChangeLog .debug (
207296 "BLOCK* NameSystem.UnderReplicationBlock.update:"
208297 + block
209- + " has only " +curReplicas
298+ + " has only " + curReplicas
210299 + " replicas and needs " + curExpectedReplicas
211300 + " replicas so is added to neededReplications"
212301 + " at priority level " + curPri );
@@ -218,64 +307,79 @@ synchronized void update(Block block, int curReplicas,
218307 synchronized BlockIterator iterator (int level ) {
219308 return new BlockIterator (level );
220309 }
221-
310+
222311 /** return an iterator of all the under replication blocks */
312+ @ Override
223313 public synchronized BlockIterator iterator () {
224314 return new BlockIterator ();
225315 }
226-
316+
317+ /**
318+ * An iterator over blocks.
319+ */
227320 class BlockIterator implements Iterator <Block > {
228321 private int level ;
229322 private boolean isIteratorForLevel = false ;
230323 private List <Iterator <Block >> iterators = new ArrayList <Iterator <Block >>();
231324
325+ /**
326+ * Construct an iterator over all queues.
327+ */
232328 private BlockIterator () {
233329 level =0 ;
234330 for (int i =0 ; i <LEVEL ; i ++) {
235331 iterators .add (priorityQueues .get (i ).iterator ());
236332 }
237333 }
238334
335+ /**
336+ * Constrict an iterator for a single queue level
337+ * @param l the priority level to iterate over
338+ */
239339 private BlockIterator (int l ) {
240340 level = l ;
241341 isIteratorForLevel = true ;
242342 iterators .add (priorityQueues .get (level ).iterator ());
243343 }
244344
245345 private void update () {
246- if (isIteratorForLevel )
346+ if (isIteratorForLevel ) {
247347 return ;
348+ }
248349 while (level < LEVEL -1 && !iterators .get (level ).hasNext ()) {
249350 level ++;
250351 }
251352 }
252353
253354 @ Override
254355 public Block next () {
255- if (isIteratorForLevel )
356+ if (isIteratorForLevel ) {
256357 return iterators .get (0 ).next ();
358+ }
257359 update ();
258360 return iterators .get (level ).next ();
259361 }
260362
261363 @ Override
262364 public boolean hasNext () {
263- if (isIteratorForLevel )
365+ if (isIteratorForLevel ) {
264366 return iterators .get (0 ).hasNext ();
367+ }
265368 update ();
266369 return iterators .get (level ).hasNext ();
267370 }
268371
269372 @ Override
270373 public void remove () {
271- if (isIteratorForLevel )
374+ if (isIteratorForLevel ) {
272375 iterators .get (0 ).remove ();
273- else
376+ } else {
274377 iterators .get (level ).remove ();
378+ }
275379 }
276380
277381 int getPriority () {
278382 return level ;
279383 }
280- }
384+ }
281385}
0 commit comments