2121import java .io .IOException ;
2222import java .io .InputStream ;
2323import java .util .ArrayList ;
24+ import java .util .Collection ;
2425import java .util .Comparator ;
2526import java .util .HashMap ;
2627import java .util .List ;
2728import java .util .Map ;
29+ import java .util .Random ;
2830import java .util .concurrent .ConcurrentHashMap ;
31+ import java .util .concurrent .atomic .AtomicBoolean ;
2932
3033import org .apache .commons .logging .Log ;
3134import org .apache .commons .logging .LogFactory ;
@@ -194,6 +197,18 @@ public Configuration getConf() {
194197 private ResourceCalculator calculator ;
195198 private boolean usePortForNodeName ;
196199
200+ private boolean scheduleAsynchronously ;
201+ private AsyncScheduleThread asyncSchedulerThread ;
202+
203+ /**
204+ * EXPERT
205+ */
206+ private long asyncScheduleInterval ;
207+ private static final String ASYNC_SCHEDULER_INTERVAL =
208+ CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_PREFIX
209+ + ".scheduling-interval-ms" ;
210+ private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5 ;
211+
197212 public CapacityScheduler () {}
198213
199214 @ Override
@@ -272,11 +287,23 @@ public Resource getClusterResources() {
272287
273288 initializeQueues (this .conf );
274289
290+ scheduleAsynchronously = this .conf .getScheduleAynschronously ();
291+ asyncScheduleInterval =
292+ this .conf .getLong (ASYNC_SCHEDULER_INTERVAL ,
293+ DEFAULT_ASYNC_SCHEDULER_INTERVAL );
294+ if (scheduleAsynchronously ) {
295+ asyncSchedulerThread = new AsyncScheduleThread (this );
296+ asyncSchedulerThread .start ();
297+ }
298+
275299 initialized = true ;
276300 LOG .info ("Initialized CapacityScheduler with " +
277301 "calculator=" + getResourceCalculator ().getClass () + ", " +
278302 "minimumAllocation=<" + getMinimumResourceCapability () + ">, " +
279- "maximumAllocation=<" + getMaximumResourceCapability () + ">" );
303+ "maximumAllocation=<" + getMaximumResourceCapability () + ">, " +
304+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
305+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms" );
306+
280307 } else {
281308 CapacitySchedulerConfiguration oldConf = this .conf ;
282309 this .conf = loadCapacitySchedulerConfiguration (configuration );
@@ -290,7 +317,69 @@ public Resource getClusterResources() {
290317 }
291318 }
292319 }
320+
321+ long getAsyncScheduleInterval () {
322+ return asyncScheduleInterval ;
323+ }
324+
325+ private final static Random random = new Random (System .currentTimeMillis ());
326+
327+ /**
328+ * Schedule on all nodes by starting at a random point.
329+ * @param cs
330+ */
331+ static void schedule (CapacityScheduler cs ) {
332+ // First randomize the start point
333+ int current = 0 ;
334+ Collection <FiCaSchedulerNode > nodes = cs .getAllNodes ().values ();
335+ int start = random .nextInt (nodes .size ());
336+ for (FiCaSchedulerNode node : nodes ) {
337+ if (current ++ >= start ) {
338+ cs .allocateContainersToNode (node );
339+ }
340+ }
341+ // Now, just get everyone to be safe
342+ for (FiCaSchedulerNode node : nodes ) {
343+ cs .allocateContainersToNode (node );
344+ }
345+ try {
346+ Thread .sleep (cs .getAsyncScheduleInterval ());
347+ } catch (InterruptedException e ) {}
348+ }
349+
350+ static class AsyncScheduleThread extends Thread {
351+
352+ private final CapacityScheduler cs ;
353+ private AtomicBoolean runSchedules = new AtomicBoolean (false );
354+
355+ public AsyncScheduleThread (CapacityScheduler cs ) {
356+ this .cs = cs ;
357+ setDaemon (true );
358+ }
293359
360+ @ Override
361+ public void run () {
362+ while (true ) {
363+ if (!runSchedules .get ()) {
364+ try {
365+ Thread .sleep (100 );
366+ } catch (InterruptedException ie ) {}
367+ } else {
368+ schedule (cs );
369+ }
370+ }
371+ }
372+
373+ public void beginSchedule () {
374+ runSchedules .set (true );
375+ }
376+
377+ public void suspendSchedule () {
378+ runSchedules .set (false );
379+ }
380+
381+ }
382+
294383 @ Private
295384 public static final String ROOT_QUEUE =
296385 CapacitySchedulerConfiguration .PREFIX + CapacitySchedulerConfiguration .ROOT ;
@@ -696,6 +785,9 @@ private synchronized void nodeUpdate(RMNode nm) {
696785 LOG .debug ("Node being looked for scheduling " + nm
697786 + " availableResource: " + node .getAvailableResource ());
698787 }
788+ }
789+
790+ private synchronized void allocateContainersToNode (FiCaSchedulerNode node ) {
699791
700792 // Assign new containers...
701793 // 1. Check for reserved applications
@@ -708,7 +800,8 @@ private synchronized void nodeUpdate(RMNode nm) {
708800
709801 // Try to fulfill the reservation
710802 LOG .info ("Trying to fulfill reservation for application " +
711- reservedApplication .getApplicationId () + " on node: " + nm );
803+ reservedApplication .getApplicationId () + " on node: " +
804+ node .getNodeID ());
712805
713806 LeafQueue queue = ((LeafQueue )reservedApplication .getQueue ());
714807 CSAssignment assignment = queue .assignContainers (clusterResource , node );
@@ -729,9 +822,16 @@ private synchronized void nodeUpdate(RMNode nm) {
729822
730823 // Try to schedule more if there are no reservations to fulfill
731824 if (node .getReservedContainer () == null ) {
732- root .assignContainers (clusterResource , node );
825+ if (Resources .greaterThanOrEqual (calculator , getClusterResources (),
826+ node .getAvailableResource (), minimumAllocation )) {
827+ if (LOG .isDebugEnabled ()) {
828+ LOG .debug ("Trying to schedule on node: " + node .getNodeName () +
829+ ", available: " + node .getAvailableResource ());
830+ }
831+ root .assignContainers (clusterResource , node );
832+ }
733833 } else {
734- LOG .info ("Skipping scheduling since node " + nm +
834+ LOG .info ("Skipping scheduling since node " + node . getNodeID () +
735835 " is reserved by application " +
736836 node .getReservedContainer ().getContainerId ().getApplicationAttemptId ()
737837 );
@@ -772,7 +872,11 @@ public void handle(SchedulerEvent event) {
772872 case NODE_UPDATE :
773873 {
774874 NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent )event ;
775- nodeUpdate (nodeUpdatedEvent .getRMNode ());
875+ RMNode node = nodeUpdatedEvent .getRMNode ();
876+ nodeUpdate (node );
877+ if (!scheduleAsynchronously ) {
878+ allocateContainersToNode (getNode (node .getNodeID ()));
879+ }
776880 }
777881 break ;
778882 case APP_ADDED :
@@ -831,6 +935,10 @@ private synchronized void addNode(RMNode nodeManager) {
831935 ++numNodeManagers ;
832936 LOG .info ("Added node " + nodeManager .getNodeAddress () +
833937 " clusterResource: " + clusterResource );
938+
939+ if (scheduleAsynchronously && numNodeManagers == 1 ) {
940+ asyncSchedulerThread .beginSchedule ();
941+ }
834942 }
835943
836944 private synchronized void removeNode (RMNode nodeInfo ) {
@@ -842,6 +950,10 @@ private synchronized void removeNode(RMNode nodeInfo) {
842950 root .updateClusterResource (clusterResource );
843951 --numNodeManagers ;
844952
953+ if (scheduleAsynchronously && numNodeManagers == 0 ) {
954+ asyncSchedulerThread .suspendSchedule ();
955+ }
956+
845957 // Remove running containers
846958 List <RMContainer > runningContainers = node .getRunningContainers ();
847959 for (RMContainer container : runningContainers ) {
@@ -931,7 +1043,12 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
9311043 FiCaSchedulerNode getNode (NodeId nodeId ) {
9321044 return nodes .get (nodeId );
9331045 }
934-
1046+
1047+ @ Lock (Lock .NoLock .class )
1048+ Map <NodeId , FiCaSchedulerNode > getAllNodes () {
1049+ return nodes ;
1050+ }
1051+
9351052 @ Override
9361053 public RMContainer getRMContainer (ContainerId containerId ) {
9371054 FiCaSchedulerApp attempt = getCurrentAttemptForContainer (containerId );
0 commit comments