1717
1818import java .io .IOException ;
1919import java .net .InetSocketAddress ;
20+ import java .util .ArrayList ;
2021
2122import org .apache .hadoop .conf .Configuration ;
22- import org .apache .hadoop .io .Text ;
2323import org .apache .hadoop .net .NetUtils ;
24- import org .apache .hadoop .security .SecurityUtil ;
2524import org .apache .hadoop .security .UserGroupInformation ;
26- import org .apache .hadoop .security .token .Token ;
27- import org .apache .hadoop .security .token .TokenIdentifier ;
28- import org .apache .hadoop .yarn .api .ContainerManager ;
29- import org .apache .hadoop .yarn .api .protocolrecords .GetContainerStatusRequest ;
30- import org .apache .hadoop .yarn .api .protocolrecords .StartContainerRequest ;
31- import org .apache .hadoop .yarn .api .protocolrecords .StartContainerResponse ;
32- import org .apache .hadoop .yarn .api .protocolrecords .StopContainerRequest ;
33- import org .apache .hadoop .yarn .api .protocolrecords .StopContainerResponse ;
25+ import org .apache .hadoop .yarn .api .ContainerManagementProtocol ;
26+ import org .apache .hadoop .yarn .api .protocolrecords .GetContainerStatusesRequest ;
27+ import org .apache .hadoop .yarn .api .protocolrecords .StartContainersRequest ;
28+ import org .apache .hadoop .yarn .api .protocolrecords .StartContainersResponse ;
29+ import org .apache .hadoop .yarn .api .protocolrecords .StopContainersRequest ;
30+ import org .apache .hadoop .yarn .api .protocolrecords .StopContainersResponse ;
3431import org .apache .hadoop .yarn .api .records .Container ;
32+ import org .apache .hadoop .yarn .api .records .ContainerId ;
3533import org .apache .hadoop .yarn .api .records .ContainerStatus ;
36- import org .apache .hadoop .yarn .api .records .ContainerToken ;
37- import org .apache .hadoop .yarn .api .records .DelegationToken ;
38- import org .apache .hadoop .yarn .exceptions .YarnRemoteException ;
39- import org .apache .hadoop .yarn .security .ContainerTokenIdentifier ;
34+ import org .apache .hadoop .yarn .api .records .Token ;
35+ import org .apache .hadoop .yarn .client .api .NMTokenCache ;
36+ import org .apache .hadoop .yarn .exceptions .YarnException ;
37+ import org .apache .hadoop .yarn .security .NMTokenIdentifier ;
38+ import org .apache .hadoop .yarn .util .ConverterUtils ;
4039import org .apache .hadoop .yarn .util .Records ;
4140import org .springframework .yarn .rpc .YarnRpcAccessor ;
4241import org .springframework .yarn .rpc .YarnRpcCallback ;
4342
4443/**
4544 * Template implementation for {@link AppmasterCmOperations} wrapping
46- * communication using {@link ContainerManager }. Methods for this
45+ * communication using {@link ContainerManagementProtocol }. Methods for this
4746 * template wraps possible exceptions into Spring Dao exception hierarchy.
4847 *
4948 * @author Janne Valkealahti
5049 *
5150 */
52- public class AppmasterCmTemplate extends YarnRpcAccessor <ContainerManager > implements AppmasterCmOperations {
51+ public class AppmasterCmTemplate extends YarnRpcAccessor <ContainerManagementProtocol > implements AppmasterCmOperations {
5352
5453 /** Container we're working for */
5554 private final Container container ;
5655
56+ /**
57+ * Instantiates a new AppmasterCmTemplate.
58+ *
59+ * @param config the hadoop configation
60+ * @param container the {@link Container}
61+ */
5762 public AppmasterCmTemplate (Configuration config , Container container ) {
58- super (ContainerManager .class , config );
63+ super (ContainerManagementProtocol .class , config );
5964 this .container = container ;
6065 }
6166
6267 @ Override
63- public StartContainerResponse startContainer (final StartContainerRequest request ) {
64- return execute (new YarnRpcCallback <StartContainerResponse , ContainerManager >() {
68+ public StartContainersResponse startContainers (final StartContainersRequest request ) {
69+ return execute (new YarnRpcCallback <StartContainersResponse , ContainerManagementProtocol >() {
6570 @ Override
66- public StartContainerResponse doInYarn (ContainerManager proxy ) throws YarnRemoteException {
67- return proxy .startContainer (request );
71+ public StartContainersResponse doInYarn (ContainerManagementProtocol proxy ) throws YarnException , IOException {
72+ return proxy .startContainers (request );
6873 }
6974 });
7075 }
7176
7277 @ Override
73- public StopContainerResponse stopContainer () {
74- return execute (new YarnRpcCallback <StopContainerResponse , ContainerManager >() {
78+ public StopContainersResponse stopContainers () {
79+ return execute (new YarnRpcCallback <StopContainersResponse , ContainerManagementProtocol >() {
7580 @ Override
76- public StopContainerResponse doInYarn (ContainerManager proxy ) throws YarnRemoteException {
77- StopContainerRequest request = Records .newRecord (StopContainerRequest .class );
78- request .setContainerId (container .getId ());
79- return proxy .stopContainer (request );
81+ public StopContainersResponse doInYarn (ContainerManagementProtocol proxy ) throws YarnException , IOException {
82+ StopContainersRequest request = Records .newRecord (StopContainersRequest .class );
83+ ArrayList <ContainerId > ids = new ArrayList <ContainerId >();
84+ ids .add (container .getId ());
85+ request .setContainerIds (ids );
86+ return proxy .stopContainers (request );
8087 }
8188 });
8289 }
8390
8491 @ Override
8592 public ContainerStatus getContainerStatus () {
86- return execute (new YarnRpcCallback <ContainerStatus , ContainerManager >() {
93+ return execute (new YarnRpcCallback <ContainerStatus , ContainerManagementProtocol >() {
8794 @ Override
88- public ContainerStatus doInYarn (ContainerManager proxy ) throws YarnRemoteException {
89- GetContainerStatusRequest request = Records .newRecord (GetContainerStatusRequest .class );
90- request .setContainerId (container .getId ());
91- return proxy .getContainerStatus (request ).getStatus ();
95+ public ContainerStatus doInYarn (ContainerManagementProtocol proxy ) throws YarnException , IOException {
96+ GetContainerStatusesRequest request = Records .newRecord (GetContainerStatusesRequest .class );
97+ ArrayList <ContainerId > ids = new ArrayList <ContainerId >();
98+ ids .add (container .getId ());
99+ request .setContainerIds (ids );
100+ return proxy .getContainerStatuses (request ).getContainerStatuses ().get (0 );
92101 }
93102 });
94103 }
@@ -101,45 +110,19 @@ protected InetSocketAddress getRpcAddress(Configuration config) {
101110
102111 @ Override
103112 protected UserGroupInformation getUser () {
104- UserGroupInformation user = null ;
105- try {
106- user = UserGroupInformation .getCurrentUser ();
107- if (UserGroupInformation .isSecurityEnabled ()) {
108- ContainerToken containerToken = container .getContainerToken ();
109- Token <ContainerTokenIdentifier > token = null ;
110- if (containerToken instanceof DelegationToken ) {
111- token = convertFromProtoFormat ((DelegationToken ) container .getContainerToken (),
112- getRpcAddress (getConfiguration ()));
113- }
114- // remote user needs to be a container id
115- user = UserGroupInformation .createRemoteUser (container .getId ().toString ());
116- user .addToken (token );
117- }
118- } catch (IOException e ) {
119- }
120- return user ;
121- }
113+ InetSocketAddress rpcAddress = getRpcAddress (getConfiguration ());
122114
123- /**
124- * Convert token identifier from a proto format.
125- * <p>
126- * This function is a copy for way it was pre hadoop-2.0.3. Helps
127- * to work with api changes.
128- *
129- * @param <T> the generic type
130- * @param protoToken the proto token
131- * @param serviceAddr the service addr
132- * @return the token identifier
133- */
134- private static <T extends TokenIdentifier > Token <T > convertFromProtoFormat (DelegationToken protoToken ,
135- InetSocketAddress serviceAddr ) {
136- // TODO: remove this method when api's are compatible
137- Token <T > token = new Token <T >(protoToken .getIdentifier ().array (), protoToken .getPassword ().array (),
138- new Text (protoToken .getKind ()), new Text (protoToken .getService ()));
139- if (serviceAddr != null ) {
140- SecurityUtil .setTokenService (token , serviceAddr );
141- }
142- return token ;
115+ // TODO: at some point remove static cache
116+ Token token = NMTokenCache .getNMToken (container .getNodeId ().toString ());
117+
118+ // this is what node manager requires for auth
119+ UserGroupInformation user =
120+ UserGroupInformation .createRemoteUser (container .getId ().getApplicationAttemptId ().toString ());
121+ org .apache .hadoop .security .token .Token <NMTokenIdentifier > nmToken =
122+ ConverterUtils .convertFromYarn (token , rpcAddress );
123+ user .addToken (nmToken );
124+
125+ return user ;
143126 }
144127
145128}
0 commit comments