2828import org .apache .hadoop .conf .Configuration ;
2929import org .apache .hadoop .fs .CommonConfigurationKeysPublic ;
3030import org .apache .hadoop .ipc .Server ;
31+ import org .apache .hadoop .mapreduce .JobACL ;
3132import org .apache .hadoop .mapreduce .MRJobConfig ;
3233import org .apache .hadoop .mapreduce .TypeConverter ;
3334import org .apache .hadoop .mapreduce .v2 .api .MRClientProtocol ;
7879import org .apache .hadoop .mapreduce .v2 .app .security .authorize .MRAMPolicyProvider ;
7980import org .apache .hadoop .mapreduce .v2 .app .webapp .AMWebApp ;
8081import org .apache .hadoop .net .NetUtils ;
82+ import org .apache .hadoop .security .AccessControlException ;
83+ import org .apache .hadoop .security .UserGroupInformation ;
8184import org .apache .hadoop .security .authorize .PolicyProvider ;
8285import org .apache .hadoop .service .AbstractService ;
8386import org .apache .hadoop .yarn .factories .RecordFactory ;
@@ -175,26 +178,32 @@ public InetSocketAddress getConnectAddress() {
175178 return getBindAddress ();
176179 }
177180
178- private Job verifyAndGetJob (JobId jobID ,
179- boolean modifyAccess ) throws IOException {
181+ private Job verifyAndGetJob (JobId jobID ,
182+ JobACL accessType ) throws IOException {
180183 Job job = appContext .getJob (jobID );
184+ UserGroupInformation ugi = UserGroupInformation .getCurrentUser ();
185+ if (!job .checkAccess (ugi , accessType )) {
186+ throw new AccessControlException ("User " + ugi .getShortUserName ()
187+ + " cannot perform operation " + accessType .name () + " on "
188+ + jobID );
189+ }
181190 return job ;
182191 }
183192
184193 private Task verifyAndGetTask (TaskId taskID ,
185- boolean modifyAccess ) throws IOException {
194+ JobACL accessType ) throws IOException {
186195 Task task = verifyAndGetJob (taskID .getJobId (),
187- modifyAccess ).getTask (taskID );
196+ accessType ).getTask (taskID );
188197 if (task == null ) {
189198 throw new IOException ("Unknown Task " + taskID );
190199 }
191200 return task ;
192201 }
193202
194203 private TaskAttempt verifyAndGetAttempt (TaskAttemptId attemptID ,
195- boolean modifyAccess ) throws IOException {
204+ JobACL accessType ) throws IOException {
196205 TaskAttempt attempt = verifyAndGetTask (attemptID .getTaskId (),
197- modifyAccess ).getAttempt (attemptID );
206+ accessType ).getAttempt (attemptID );
198207 if (attempt == null ) {
199208 throw new IOException ("Unknown TaskAttempt " + attemptID );
200209 }
@@ -205,7 +214,7 @@ private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
205214 public GetCountersResponse getCounters (GetCountersRequest request )
206215 throws IOException {
207216 JobId jobId = request .getJobId ();
208- Job job = verifyAndGetJob (jobId , false );
217+ Job job = verifyAndGetJob (jobId , JobACL . VIEW_JOB );
209218 GetCountersResponse response =
210219 recordFactory .newRecordInstance (GetCountersResponse .class );
211220 response .setCounters (TypeConverter .toYarn (job .getAllCounters ()));
@@ -216,7 +225,7 @@ public GetCountersResponse getCounters(GetCountersRequest request)
216225 public GetJobReportResponse getJobReport (GetJobReportRequest request )
217226 throws IOException {
218227 JobId jobId = request .getJobId ();
219- Job job = verifyAndGetJob (jobId , false );
228+ Job job = verifyAndGetJob (jobId , JobACL . VIEW_JOB );
220229 GetJobReportResponse response =
221230 recordFactory .newRecordInstance (GetJobReportResponse .class );
222231 if (job != null ) {
@@ -235,7 +244,7 @@ public GetTaskAttemptReportResponse getTaskAttemptReport(
235244 GetTaskAttemptReportResponse response =
236245 recordFactory .newRecordInstance (GetTaskAttemptReportResponse .class );
237246 response .setTaskAttemptReport (
238- verifyAndGetAttempt (taskAttemptId , false ).getReport ());
247+ verifyAndGetAttempt (taskAttemptId , JobACL . VIEW_JOB ).getReport ());
239248 return response ;
240249 }
241250
@@ -245,7 +254,8 @@ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
245254 TaskId taskId = request .getTaskId ();
246255 GetTaskReportResponse response =
247256 recordFactory .newRecordInstance (GetTaskReportResponse .class );
248- response .setTaskReport (verifyAndGetTask (taskId , false ).getReport ());
257+ response .setTaskReport (
258+ verifyAndGetTask (taskId , JobACL .VIEW_JOB ).getReport ());
249259 return response ;
250260 }
251261
@@ -256,7 +266,7 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
256266 JobId jobId = request .getJobId ();
257267 int fromEventId = request .getFromEventId ();
258268 int maxEvents = request .getMaxEvents ();
259- Job job = verifyAndGetJob (jobId , false );
269+ Job job = verifyAndGetJob (jobId , JobACL . VIEW_JOB );
260270
261271 GetTaskAttemptCompletionEventsResponse response =
262272 recordFactory .newRecordInstance (GetTaskAttemptCompletionEventsResponse .class );
@@ -270,9 +280,11 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
270280 public KillJobResponse killJob (KillJobRequest request )
271281 throws IOException {
272282 JobId jobId = request .getJobId ();
273- String message = "Kill Job received from client " + jobId ;
283+ UserGroupInformation callerUGI = UserGroupInformation .getCurrentUser ();
284+ String message = "Kill job " + jobId + " received from " + callerUGI
285+ + " at " + Server .getRemoteAddress ();
274286 LOG .info (message );
275- verifyAndGetJob (jobId , true );
287+ verifyAndGetJob (jobId , JobACL . MODIFY_JOB );
276288 appContext .getEventHandler ().handle (
277289 new JobDiagnosticsUpdateEvent (jobId , message ));
278290 appContext .getEventHandler ().handle (
@@ -287,9 +299,11 @@ public KillJobResponse killJob(KillJobRequest request)
287299 public KillTaskResponse killTask (KillTaskRequest request )
288300 throws IOException {
289301 TaskId taskId = request .getTaskId ();
290- String message = "Kill task received from client " + taskId ;
302+ UserGroupInformation callerUGI = UserGroupInformation .getCurrentUser ();
303+ String message = "Kill task " + taskId + " received from " + callerUGI
304+ + " at " + Server .getRemoteAddress ();
291305 LOG .info (message );
292- verifyAndGetTask (taskId , true );
306+ verifyAndGetTask (taskId , JobACL . MODIFY_JOB );
293307 appContext .getEventHandler ().handle (
294308 new TaskEvent (taskId , TaskEventType .T_KILL ));
295309 KillTaskResponse response =
@@ -302,9 +316,12 @@ public KillTaskResponse killTask(KillTaskRequest request)
302316 public KillTaskAttemptResponse killTaskAttempt (
303317 KillTaskAttemptRequest request ) throws IOException {
304318 TaskAttemptId taskAttemptId = request .getTaskAttemptId ();
305- String message = "Kill task attempt received from client " + taskAttemptId ;
319+ UserGroupInformation callerUGI = UserGroupInformation .getCurrentUser ();
320+ String message = "Kill task attempt " + taskAttemptId
321+ + " received from " + callerUGI + " at "
322+ + Server .getRemoteAddress ();
306323 LOG .info (message );
307- verifyAndGetAttempt (taskAttemptId , true );
324+ verifyAndGetAttempt (taskAttemptId , JobACL . MODIFY_JOB );
308325 appContext .getEventHandler ().handle (
309326 new TaskAttemptDiagnosticsUpdateEvent (taskAttemptId , message ));
310327 appContext .getEventHandler ().handle (
@@ -322,8 +339,8 @@ public GetDiagnosticsResponse getDiagnostics(
322339
323340 GetDiagnosticsResponse response =
324341 recordFactory .newRecordInstance (GetDiagnosticsResponse .class );
325- response .addAllDiagnostics (
326- verifyAndGetAttempt ( taskAttemptId , false ).getDiagnostics ());
342+ response .addAllDiagnostics (verifyAndGetAttempt ( taskAttemptId ,
343+ JobACL . VIEW_JOB ).getDiagnostics ());
327344 return response ;
328345 }
329346
@@ -332,9 +349,12 @@ public GetDiagnosticsResponse getDiagnostics(
332349 public FailTaskAttemptResponse failTaskAttempt (
333350 FailTaskAttemptRequest request ) throws IOException {
334351 TaskAttemptId taskAttemptId = request .getTaskAttemptId ();
335- String message = "Fail task attempt received from client " + taskAttemptId ;
352+ UserGroupInformation callerUGI = UserGroupInformation .getCurrentUser ();
353+ String message = "Fail task attempt " + taskAttemptId
354+ + " received from " + callerUGI + " at "
355+ + Server .getRemoteAddress ();
336356 LOG .info (message );
337- verifyAndGetAttempt (taskAttemptId , true );
357+ verifyAndGetAttempt (taskAttemptId , JobACL . MODIFY_JOB );
338358 appContext .getEventHandler ().handle (
339359 new TaskAttemptDiagnosticsUpdateEvent (taskAttemptId , message ));
340360 appContext .getEventHandler ().handle (
@@ -356,7 +376,7 @@ public GetTaskReportsResponse getTaskReports(
356376 GetTaskReportsResponse response =
357377 recordFactory .newRecordInstance (GetTaskReportsResponse .class );
358378
359- Job job = verifyAndGetJob (jobId , false );
379+ Job job = verifyAndGetJob (jobId , JobACL . VIEW_JOB );
360380 Collection <Task > tasks = job .getTasks (taskType ).values ();
361381 LOG .info ("Getting task report for " + taskType + " " + jobId
362382 + ". Report-size will be " + tasks .size ());
0 commit comments