|
24 | 24 | import java.net.InetSocketAddress; |
25 | 25 | import java.net.URI; |
26 | 26 | import java.net.URISyntaxException; |
| 27 | +import java.security.PrivilegedAction; |
27 | 28 | import java.util.ArrayList; |
28 | 29 | import java.util.HashMap; |
29 | 30 | import java.util.List; |
|
38 | 39 | import org.apache.commons.cli.ParseException; |
39 | 40 | import org.apache.commons.logging.Log; |
40 | 41 | import org.apache.commons.logging.LogFactory; |
41 | | - |
42 | 42 | import org.apache.hadoop.classification.InterfaceAudience; |
43 | 43 | import org.apache.hadoop.classification.InterfaceStability; |
44 | 44 | import org.apache.hadoop.conf.Configuration; |
45 | 45 | import org.apache.hadoop.net.NetUtils; |
| 46 | +import org.apache.hadoop.security.UserGroupInformation; |
| 47 | +import org.apache.hadoop.security.token.Token; |
46 | 48 | import org.apache.hadoop.yarn.api.AMRMProtocol; |
47 | 49 | import org.apache.hadoop.yarn.api.ApplicationConstants; |
48 | 50 | import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
49 | 51 | import org.apache.hadoop.yarn.api.ContainerExitStatus; |
50 | 52 | import org.apache.hadoop.yarn.api.ContainerManager; |
51 | | - |
52 | 53 | import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
53 | 54 | import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
54 | 55 | import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
55 | 56 | import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
56 | 57 | import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
57 | | - |
58 | 58 | import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
59 | 59 | import org.apache.hadoop.yarn.api.records.Container; |
60 | 60 | import org.apache.hadoop.yarn.api.records.ContainerId; |
|
74 | 74 | import org.apache.hadoop.yarn.conf.YarnConfiguration; |
75 | 75 | import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
76 | 76 | import org.apache.hadoop.yarn.ipc.YarnRPC; |
| 77 | +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; |
77 | 78 | import org.apache.hadoop.yarn.util.ConverterUtils; |
| 79 | +import org.apache.hadoop.yarn.util.ProtoUtils; |
78 | 80 | import org.apache.hadoop.yarn.util.Records; |
79 | 81 |
|
80 | 82 | /** |
@@ -663,10 +665,22 @@ private void connectToCM() { |
663 | 665 | + container.getId()); |
664 | 666 | String cmIpPortStr = container.getNodeId().getHost() + ":" |
665 | 667 | + container.getNodeId().getPort(); |
666 | | - InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); |
| 668 | + final InetSocketAddress cmAddress = |
| 669 | + NetUtils.createSocketAddr(cmIpPortStr); |
667 | 670 | LOG.info("Connecting to ContainerManager at " + cmIpPortStr); |
668 | | - this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, |
669 | | - cmAddress, conf)); |
| 671 | + UserGroupInformation ugi = |
| 672 | + UserGroupInformation.createRemoteUser(container.getId().toString()); |
| 673 | + Token<ContainerTokenIdentifier> token = |
| 674 | + ProtoUtils.convertFromProtoFormat(container.getContainerToken(), |
| 675 | + cmAddress); |
| 676 | + ugi.addToken(token); |
| 677 | + this.cm = ugi.doAs(new PrivilegedAction<ContainerManager>() { |
| 678 | + @Override |
| 679 | + public ContainerManager run() { |
| 680 | + return ((ContainerManager) rpc.getProxy(ContainerManager.class, |
| 681 | + cmAddress, conf)); |
| 682 | + } |
| 683 | + }); |
670 | 684 | } |
671 | 685 |
|
672 | 686 | @Override |
|
0 commit comments