3434import org .apache .commons .logging .LogFactory ;
3535import org .apache .hadoop .classification .InterfaceAudience .Private ;
3636import org .apache .hadoop .net .NetworkTopology ;
37+ import org .apache .hadoop .yarn .api .protocolrecords .GetNewApplicationRequest ;
38+ import org .apache .hadoop .yarn .api .protocolrecords .GetNewApplicationResponse ;
3739import org .apache .hadoop .yarn .api .protocolrecords .StartContainerRequest ;
3840import org .apache .hadoop .yarn .api .protocolrecords .StartContainersRequest ;
3941import org .apache .hadoop .yarn .api .protocolrecords .StopContainersRequest ;
4749import org .apache .hadoop .yarn .api .records .Priority ;
4850import org .apache .hadoop .yarn .api .records .Resource ;
4951import org .apache .hadoop .yarn .api .records .ResourceRequest ;
52+ import org .apache .hadoop .yarn .conf .YarnConfiguration ;
5053import org .apache .hadoop .yarn .exceptions .YarnException ;
5154import org .apache .hadoop .yarn .factories .RecordFactory ;
5255import org .apache .hadoop .yarn .factory .providers .RecordFactoryProvider ;
5356import org .apache .hadoop .yarn .server .resourcemanager .Task .State ;
57+ import org .apache .hadoop .yarn .server .resourcemanager .scheduler .Allocation ;
5458import org .apache .hadoop .yarn .server .resourcemanager .scheduler .NodeType ;
59+ import org .apache .hadoop .yarn .server .resourcemanager .scheduler .ResourceScheduler ;
60+ import org .apache .hadoop .yarn .server .resourcemanager .scheduler .event .AppAddedSchedulerEvent ;
61+ import org .apache .hadoop .yarn .util .Records ;
5562import org .apache .hadoop .yarn .util .resource .Resources ;
5663
5764@ Private
@@ -89,16 +96,23 @@ public class Application {
8996
9097 Resource used = recordFactory .newRecordInstance (Resource .class );
9198
92- public Application (String user , ResourceManager resourceManager ) {
99+ public Application (String user , ResourceManager resourceManager )
100+ throws YarnException {
93101 this (user , "default" , resourceManager );
94102 }
95103
96- public Application (String user , String queue , ResourceManager resourceManager ) {
104+ public Application (String user , String queue , ResourceManager resourceManager )
105+ throws YarnException {
97106 this .user = user ;
98107 this .queue = queue ;
99108 this .resourceManager = resourceManager ;
100- this .applicationId =
101- this .resourceManager .getClientRMService ().getNewApplicationId ();
109+ // register an application
110+ GetNewApplicationRequest request =
111+ Records .newRecord (GetNewApplicationRequest .class );
112+ GetNewApplicationResponse newApp =
113+ this .resourceManager .getClientRMService ().getNewApplication (request );
114+ this .applicationId = newApp .getApplicationId ();
115+
102116 this .applicationAttemptId =
103117 ApplicationAttemptId .newInstance (this .applicationId ,
104118 this .numAttempts .getAndIncrement ());
@@ -115,6 +129,10 @@ public String getQueue() {
115129 public ApplicationId getApplicationId () {
116130 return applicationId ;
117131 }
132+
133+ public ApplicationAttemptId getApplicationAttemptId () {
134+ return applicationAttemptId ;
135+ }
118136
119137 public static String resolve (String hostName ) {
120138 return NetworkTopology .DEFAULT_RACK ;
@@ -132,10 +150,25 @@ public synchronized void submit() throws IOException, YarnException {
132150 ApplicationSubmissionContext context = recordFactory .newRecordInstance (ApplicationSubmissionContext .class );
133151 context .setApplicationId (this .applicationId );
134152 context .setQueue (this .queue );
153+
154+ // Set up the container launch context for the application master
155+ ContainerLaunchContext amContainer
156+ = Records .newRecord (ContainerLaunchContext .class );
157+ context .setAMContainerSpec (amContainer );
158+ context .setResource (Resources .createResource (
159+ YarnConfiguration .DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB ));
160+
135161 SubmitApplicationRequest request = recordFactory
136162 .newRecordInstance (SubmitApplicationRequest .class );
137163 request .setApplicationSubmissionContext (context );
164+ final ResourceScheduler scheduler = resourceManager .getResourceScheduler ();
165+
138166 resourceManager .getClientRMService ().submitApplication (request );
167+
168+ // Notify scheduler
169+ AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent (
170+ this .applicationAttemptId , this .queue , this .user );
171+ scheduler .handle (appAddedEvent1 );
139172 }
140173
141174 public synchronized void addResourceRequestSpec (
@@ -267,17 +300,13 @@ public synchronized List<Container> getResources() throws IOException {
267300 }
268301
269302 // Get resources from the ResourceManager
270- resourceManager .getResourceScheduler ().allocate (applicationAttemptId ,
271- new ArrayList <ResourceRequest >(ask ), new ArrayList <ContainerId >(), null , null );
303+ Allocation allocation = resourceManager .getResourceScheduler ().allocate (
304+ applicationAttemptId , new ArrayList <ResourceRequest >(ask ),
305+ new ArrayList <ContainerId >(), null , null );
272306 System .out .println ("-=======" + applicationAttemptId );
273307 System .out .println ("----------" + resourceManager .getRMContext ().getRMApps ()
274308 .get (applicationId ).getRMAppAttempt (applicationAttemptId ));
275-
276- List <Container > containers = null ;
277- // TODO: Fix
278- // resourceManager.getRMContext().getRMApps()
279- // .get(applicationId).getRMAppAttempt(applicationAttemptId)
280- // .pullNewlyAllocatedContainers();
309+ List <Container > containers = allocation .getContainers ();
281310
282311 // Clear state for next interaction with ResourceManager
283312 ask .clear ();
0 commit comments