2121import java .io .IOException ;
2222import java .util .ArrayList ;
2323import java .util .Collection ;
24+ import java .util .Collections ;
25+ import java .util .LinkedHashSet ;
2426import java .util .HashSet ;
2527import java .util .List ;
2628import java .util .HashMap ;
2729import java .util .Set ;
2830import java .util .Iterator ;
2931import java .util .Map ;
32+ import java .util .Map .Entry ;
3033
34+ import org .apache .commons .logging .Log ;
35+ import org .apache .commons .logging .LogFactory ;
3136import org .apache .hadoop .classification .InterfaceAudience ;
3237import org .apache .hadoop .classification .InterfaceStability ;
3338import org .apache .hadoop .conf .Configuration ;
4954import org .apache .hadoop .net .NetworkTopology ;
5055
5156import com .google .common .annotations .VisibleForTesting ;
57+ import com .google .common .collect .HashMultiset ;
58+ import com .google .common .collect .Multiset ;
5259
5360/**
5461 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
7885public abstract class CombineFileInputFormat <K , V >
7986 extends FileInputFormat <K , V > {
8087
88+ private static final Log LOG = LogFactory .getLog (CombineFileInputFormat .class );
89+
8190 public static final String SPLIT_MINSIZE_PERNODE =
8291 "mapreduce.input.fileinputformat.split.minsize.per.node" ;
8392 public static final String SPLIT_MINSIZE_PERRACK =
@@ -185,6 +194,8 @@ public List<InputSplit> getSplits(JobContext job)
185194 maxSize = maxSplitSize ;
186195 } else {
187196 maxSize = conf .getLong ("mapreduce.input.fileinputformat.split.maxsize" , 0 );
197+ // If maxSize is not configured, a single split will be generated per
198+ // node.
188199 }
189200 if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize ) {
190201 throw new IOException ("Minimum split size pernode " + minSizeNode +
@@ -257,8 +268,8 @@ private void getMoreSplits(JobContext job, List<FileStatus> stats,
257268 new HashMap <OneBlockInfo , String []>();
258269
259270 // mapping from a node to the list of blocks that it contains
260- HashMap <String , List <OneBlockInfo >> nodeToBlocks =
261- new HashMap <String , List <OneBlockInfo >>();
271+ HashMap <String , Set <OneBlockInfo >> nodeToBlocks =
272+ new HashMap <String , Set <OneBlockInfo >>();
262273
263274 files = new OneFileInfo [stats .size ()];
264275 if (stats .size () == 0 ) {
@@ -279,93 +290,128 @@ private void getMoreSplits(JobContext job, List<FileStatus> stats,
279290 }
280291
281292 @ VisibleForTesting
282- void createSplits (HashMap <String , List <OneBlockInfo >> nodeToBlocks ,
283- HashMap <OneBlockInfo , String []> blockToNodes ,
284- HashMap <String , List <OneBlockInfo >> rackToBlocks ,
293+ void createSplits (Map <String , Set <OneBlockInfo >> nodeToBlocks ,
294+ Map <OneBlockInfo , String []> blockToNodes ,
295+ Map <String , List <OneBlockInfo >> rackToBlocks ,
285296 long totLength ,
286297 long maxSize ,
287298 long minSizeNode ,
288299 long minSizeRack ,
289300 List <InputSplit > splits
290301 ) {
291302 ArrayList <OneBlockInfo > validBlocks = new ArrayList <OneBlockInfo >();
292- Set <String > nodes = new HashSet <String >();
293303 long curSplitSize = 0 ;
294304
295- int numNodes = nodeToBlocks .size ();
305+ int totalNodes = nodeToBlocks .size ();
296306 long totalLength = totLength ;
297307
308+ Multiset <String > splitsPerNode = HashMultiset .create ();
309+ Set <String > completedNodes = new HashSet <String >();
310+
298311 while (true ) {
299312 // it is allowed for maxSize to be 0. Disable smoothing load for such cases
300- int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
301- ((int ) (totalLength /maxSize ))/numNodes
302- : Integer .MAX_VALUE ;
303- int maxSplitsByNodeOnly = (avgSplitsPerNode > 0 ) ? avgSplitsPerNode : 1 ;
304- numNodes = 0 ;
305-
306- // process all nodes and create splits that are local to a node.
307- for (Iterator <Map .Entry <String , List <OneBlockInfo >>> iter = nodeToBlocks
313+
314+ // process all nodes and create splits that are local to a node. Generate
315+ // one split per node iteration, and walk over nodes multiple times to
316+ // distribute the splits across nodes.
317+ for (Iterator <Map .Entry <String , Set <OneBlockInfo >>> iter = nodeToBlocks
308318 .entrySet ().iterator (); iter .hasNext ();) {
309- Map .Entry <String , List <OneBlockInfo >> one = iter .next ();
310- nodes .add (one .getKey ());
311- List <OneBlockInfo > blocksInNode = one .getValue ();
319+ Map .Entry <String , Set <OneBlockInfo >> one = iter .next ();
320+
321+ String node = one .getKey ();
322+
323+ // Skip the node if it has previously been marked as completed.
324+ if (completedNodes .contains (node )) {
325+ continue ;
326+ }
327+
328+ Set <OneBlockInfo > blocksInCurrentNode = one .getValue ();
312329
313330 // for each block, copy it into validBlocks. Delete it from
314331 // blockToNodes so that the same block does not appear in
315332 // two different splits.
316- int splitsInNode = 0 ;
317- for (OneBlockInfo oneblock : blocksInNode ) {
318- if (blockToNodes .containsKey (oneblock )) {
319- validBlocks .add (oneblock );
320- blockToNodes .remove (oneblock );
321- curSplitSize += oneblock .length ;
322-
323- // if the accumulated split size exceeds the maximum, then
324- // create this split.
325- if (maxSize != 0 && curSplitSize >= maxSize ) {
326- // create an input split and add it to the splits array
327- addCreatedSplit (splits , nodes , validBlocks );
328- totalLength -= curSplitSize ;
329- curSplitSize = 0 ;
330- validBlocks .clear ();
331- splitsInNode ++;
332- if (splitsInNode == maxSplitsByNodeOnly ) {
333- // stop grouping on a node so as not to create
334- // disproportionately more splits on a node because it happens
335- // to have many blocks
336- // consider only these nodes in next round of grouping because
337- // they have leftover blocks that may need to be grouped
338- numNodes ++;
339- break ;
340- }
341- }
333+ Iterator <OneBlockInfo > oneBlockIter = blocksInCurrentNode .iterator ();
334+ while (oneBlockIter .hasNext ()) {
335+ OneBlockInfo oneblock = oneBlockIter .next ();
336+
337+ // Remove all blocks which may already have been assigned to other
338+ // splits.
339+ if (!blockToNodes .containsKey (oneblock )) {
340+ oneBlockIter .remove ();
341+ continue ;
342+ }
343+
344+ validBlocks .add (oneblock );
345+ blockToNodes .remove (oneblock );
346+ curSplitSize += oneblock .length ;
347+
348+ // if the accumulated split size exceeds the maximum, then
349+ // create this split.
350+ if (maxSize != 0 && curSplitSize >= maxSize ) {
351+ // create an input split and add it to the splits array
352+ addCreatedSplit (splits , Collections .singleton (node ), validBlocks );
353+ totalLength -= curSplitSize ;
354+ curSplitSize = 0 ;
355+
356+ splitsPerNode .add (node );
357+
358+ // Remove entries from blocksInNode so that we don't walk these
359+ // again.
360+ blocksInCurrentNode .removeAll (validBlocks );
361+ validBlocks .clear ();
362+
363+ // Done creating a single split for this node. Move on to the next
364+ // node so that splits are distributed across nodes.
365+ break ;
342366 }
367+
343368 }
344- // if there were any blocks left over and their combined size is
345- // larger than minSplitNode, then combine them into one split.
346- // Otherwise add them back to the unprocessed pool. It is likely
347- // that they will be combined with other blocks from the
348- // same rack later on.
349- if (minSizeNode != 0 && curSplitSize >= minSizeNode
350- && splitsInNode == 0 ) {
351- // haven't created any split on this machine. so its ok to add a
352- // smaller
353- // one for parallelism. Otherwise group it in the rack for balanced
354- // size
355- // create an input split and add it to the splits array
356- addCreatedSplit (splits , nodes , validBlocks );
357- totalLength -= curSplitSize ;
358- } else {
359- for (OneBlockInfo oneblock : validBlocks ) {
360- blockToNodes .put (oneblock , oneblock .hosts );
369+ if (validBlocks .size () != 0 ) {
370+ // This implies that the last few blocks (or all in case maxSize=0)
371+ // were not part of a split. The node is complete.
372+
373+ // if there were any blocks left over and their combined size is
374+ // larger than minSplitNode, then combine them into one split.
375+ // Otherwise add them back to the unprocessed pool. It is likely
376+ // that they will be combined with other blocks from the
377+ // same rack later on.
378+ // This condition also kicks in when max split size is not set. All
379+ // blocks on a node will be grouped together into a single split.
380+ if (minSizeNode != 0 && curSplitSize >= minSizeNode
381+ && splitsPerNode .count (node ) == 0 ) {
382+ // haven't created any split on this machine. so its ok to add a
383+ // smaller one for parallelism. Otherwise group it in the rack for
384+ // balanced size create an input split and add it to the splits
385+ // array
386+ addCreatedSplit (splits , Collections .singleton (node ), validBlocks );
387+ totalLength -= curSplitSize ;
388+ splitsPerNode .add (node );
389+ // Remove entries from blocksInNode so that we don't walk this again.
390+ blocksInCurrentNode .removeAll (validBlocks );
391+ // The node is done. This was the last set of blocks for this node.
392+ } else {
393+ // Put the unplaced blocks back into the pool for later rack-allocation.
394+ for (OneBlockInfo oneblock : validBlocks ) {
395+ blockToNodes .put (oneblock , oneblock .hosts );
396+ }
361397 }
398+ validBlocks .clear ();
399+ curSplitSize = 0 ;
400+ completedNodes .add (node );
401+ } else { // No in-flight blocks.
402+ if (blocksInCurrentNode .size () == 0 ) {
403+ // Node is done. All blocks were fit into node-local splits.
404+ completedNodes .add (node );
405+ } // else Run through the node again.
362406 }
363- validBlocks .clear ();
364- nodes .clear ();
365- curSplitSize = 0 ;
366407 }
367-
368- if (!(numNodes >0 && totalLength >0 )) {
408+
409+ // Check if node-local assignments are complete.
410+ if (completedNodes .size () == totalNodes || totalLength == 0 ) {
411+ // All nodes have been walked over and marked as completed or all blocks
412+ // have been assigned. The rest should be handled via rackLock assignment.
413+ LOG .info ("DEBUG: Terminated node allocation with : CompletedNodes: "
414+ + completedNodes .size () + ", size left: " + totalLength );
369415 break ;
370416 }
371417 }
@@ -514,7 +560,7 @@ static class OneFileInfo {
514560 boolean isSplitable ,
515561 HashMap <String , List <OneBlockInfo >> rackToBlocks ,
516562 HashMap <OneBlockInfo , String []> blockToNodes ,
517- HashMap <String , List <OneBlockInfo >> nodeToBlocks ,
563+ HashMap <String , Set <OneBlockInfo >> nodeToBlocks ,
518564 HashMap <String , Set <String >> rackToNodes ,
519565 long maxSize )
520566 throws IOException {
@@ -588,10 +634,10 @@ static class OneFileInfo {
588634
589635 @ VisibleForTesting
590636 static void populateBlockInfo (OneBlockInfo [] blocks ,
591- HashMap <String , List <OneBlockInfo >> rackToBlocks ,
592- HashMap <OneBlockInfo , String []> blockToNodes ,
593- HashMap <String , List <OneBlockInfo >> nodeToBlocks ,
594- HashMap <String , Set <String >> rackToNodes ) {
637+ Map <String , List <OneBlockInfo >> rackToBlocks ,
638+ Map <OneBlockInfo , String []> blockToNodes ,
639+ Map <String , Set <OneBlockInfo >> nodeToBlocks ,
640+ Map <String , Set <String >> rackToNodes ) {
595641 for (OneBlockInfo oneblock : blocks ) {
596642 // add this block to the block --> node locations map
597643 blockToNodes .put (oneblock , oneblock .hosts );
@@ -623,9 +669,9 @@ static void populateBlockInfo(OneBlockInfo[] blocks,
623669 // add this block to the node --> block map
624670 for (int j = 0 ; j < oneblock .hosts .length ; j ++) {
625671 String node = oneblock .hosts [j ];
626- List <OneBlockInfo > blklist = nodeToBlocks .get (node );
672+ Set <OneBlockInfo > blklist = nodeToBlocks .get (node );
627673 if (blklist == null ) {
628- blklist = new ArrayList <OneBlockInfo >();
674+ blklist = new LinkedHashSet <OneBlockInfo >();
629675 nodeToBlocks .put (node , blklist );
630676 }
631677 blklist .add (oneblock );
@@ -689,7 +735,7 @@ protected BlockLocation[] getFileBlockLocations(
689735 return fs .getFileBlockLocations (stat , 0 , stat .getLen ());
690736 }
691737
692- private static void addHostToRack (HashMap <String , Set <String >> rackToNodes ,
738+ private static void addHostToRack (Map <String , Set <String >> rackToNodes ,
693739 String rack , String host ) {
694740 Set <String > hosts = rackToNodes .get (rack );
695741 if (hosts == null ) {
0 commit comments