|
6 | 6 | import static org.mockito.Mockito.never; |
7 | 7 | import static org.mockito.Mockito.verify; |
8 | 8 | import static org.mockito.Mockito.when; |
| 9 | +import static org.mockito.Mockito.atLeast; |
| 10 | +import org.mockito.ArgumentCaptor; |
9 | 11 |
|
10 | 12 | import java.net.InetSocketAddress; |
| 13 | +import java.util.concurrent.BrokenBarrierException; |
| 14 | +import java.util.concurrent.CyclicBarrier; |
11 | 15 |
|
12 | 16 | import org.apache.commons.logging.Log; |
13 | 17 | import org.apache.commons.logging.LogFactory; |
|
18 | 22 | import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
19 | 23 | import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
20 | 24 | import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| 25 | +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; |
21 | 26 | import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; |
22 | 27 | import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; |
23 | 28 | import org.apache.hadoop.yarn.api.ContainerManager; |
| 29 | +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; |
| 30 | +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; |
24 | 31 | import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
25 | 32 | import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; |
26 | 33 | import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; |
| 34 | +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; |
27 | 35 | import org.apache.hadoop.yarn.api.records.ApplicationId; |
28 | 36 | import org.apache.hadoop.yarn.api.records.ContainerId; |
| 37 | +import org.apache.hadoop.yarn.event.Event; |
29 | 38 | import org.apache.hadoop.yarn.event.EventHandler; |
| 39 | +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; |
30 | 40 | import org.apache.hadoop.yarn.factories.RecordFactory; |
31 | 41 | import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
32 | 42 | import org.apache.hadoop.yarn.ipc.YarnRPC; |
@@ -272,6 +282,150 @@ public void testMyShutdown() throws Exception { |
272 | 282 | } finally { |
273 | 283 | ut.stop(); |
274 | 284 | verify(mockCM).stopContainer(any(StopContainerRequest.class)); |
275 | | -} |
| 285 | + } |
| 286 | + } |
| 287 | + |
| 288 | + @SuppressWarnings({ "rawtypes", "unchecked" }) |
| 289 | + @Test |
| 290 | + public void testContainerCleaned() throws Exception { |
| 291 | + LOG.info("STARTING testContainerCleaned"); |
| 292 | + |
| 293 | + CyclicBarrier startLaunchBarrier = new CyclicBarrier(2); |
| 294 | + CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2); |
| 295 | + |
| 296 | + YarnRPC mockRpc = mock(YarnRPC.class); |
| 297 | + AppContext mockContext = mock(AppContext.class); |
| 298 | + |
| 299 | + EventHandler mockEventHandler = mock(EventHandler.class); |
| 300 | + when(mockContext.getEventHandler()).thenReturn(mockEventHandler); |
| 301 | + |
| 302 | + ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier); |
| 303 | + when(mockRpc.getProxy(eq(ContainerManager.class), |
| 304 | + any(InetSocketAddress.class), any(Configuration.class))) |
| 305 | + .thenReturn(mockCM); |
| 306 | + |
| 307 | + ContainerLauncherImplUnderTest ut = |
| 308 | + new ContainerLauncherImplUnderTest(mockContext, mockRpc); |
| 309 | + |
| 310 | + Configuration conf = new Configuration(); |
| 311 | + ut.init(conf); |
| 312 | + ut.start(); |
| 313 | + try { |
| 314 | + ContainerId contId = makeContainerId(0l, 0, 0, 1); |
| 315 | + TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); |
| 316 | + String cmAddress = "127.0.0.1:8000"; |
| 317 | + StartContainerResponse startResp = |
| 318 | + recordFactory.newRecordInstance(StartContainerResponse.class); |
| 319 | + startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, |
| 320 | + ShuffleHandler.serializeMetaData(80)); |
| 321 | + |
| 322 | + |
| 323 | + LOG.info("inserting launch event"); |
| 324 | + ContainerRemoteLaunchEvent mockLaunchEvent = |
| 325 | + mock(ContainerRemoteLaunchEvent.class); |
| 326 | + when(mockLaunchEvent.getType()) |
| 327 | + .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH); |
| 328 | + when(mockLaunchEvent.getContainerID()) |
| 329 | + .thenReturn(contId); |
| 330 | + when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); |
| 331 | + when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); |
| 332 | + ut.handle(mockLaunchEvent); |
| 333 | + |
| 334 | + startLaunchBarrier.await(); |
| 335 | + |
| 336 | + |
| 337 | + LOG.info("inserting cleanup event"); |
| 338 | + ContainerLauncherEvent mockCleanupEvent = |
| 339 | + mock(ContainerLauncherEvent.class); |
| 340 | + when(mockCleanupEvent.getType()) |
| 341 | + .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); |
| 342 | + when(mockCleanupEvent.getContainerID()) |
| 343 | + .thenReturn(contId); |
| 344 | + when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); |
| 345 | + when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); |
| 346 | + ut.handle(mockCleanupEvent); |
| 347 | + |
| 348 | + completeLaunchBarrier.await(); |
| 349 | + |
| 350 | + ut.waitForPoolToIdle(); |
| 351 | + |
| 352 | + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); |
| 353 | + verify(mockEventHandler, atLeast(2)).handle(arg.capture()); |
| 354 | + boolean containerCleaned = false; |
| 355 | + |
| 356 | + for (int i =0; i < arg.getAllValues().size(); i++) { |
| 357 | + LOG.info(arg.getAllValues().get(i).toString()); |
| 358 | + Event currentEvent = arg.getAllValues().get(i); |
| 359 | + if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) { |
| 360 | + containerCleaned = true; |
| 361 | + } |
| 362 | + } |
| 363 | + assert(containerCleaned); |
| 364 | + |
| 365 | + } finally { |
| 366 | + ut.stop(); |
| 367 | + } |
| 368 | + } |
| 369 | + |
| 370 | + private static class ContainerManagerForTest implements ContainerManager { |
| 371 | + |
| 372 | + private CyclicBarrier startLaunchBarrier; |
| 373 | + private CyclicBarrier completeLaunchBarrier; |
| 374 | + |
| 375 | + ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) { |
| 376 | + this.startLaunchBarrier = startLaunchBarrier; |
| 377 | + this.completeLaunchBarrier = completeLaunchBarrier; |
| 378 | + } |
| 379 | + @Override |
| 380 | + public StartContainerResponse startContainer(StartContainerRequest request) |
| 381 | + throws YarnRemoteException { |
| 382 | + try { |
| 383 | + startLaunchBarrier.await(); |
| 384 | + completeLaunchBarrier.await(); |
| 385 | + //To ensure the kill is started before the launch |
| 386 | + Thread.sleep(100); |
| 387 | + } catch (InterruptedException e) { |
| 388 | + e.printStackTrace(); |
| 389 | + } catch (BrokenBarrierException e) { |
| 390 | + e.printStackTrace(); |
| 391 | + } |
| 392 | + |
| 393 | + throw new ContainerException("Force fail CM"); |
| 394 | + |
| 395 | + } |
| 396 | + |
| 397 | + @Override |
| 398 | + public StopContainerResponse stopContainer(StopContainerRequest request) |
| 399 | + throws YarnRemoteException { |
| 400 | + |
| 401 | + return null; |
| 402 | + } |
| 403 | + |
| 404 | + @Override |
| 405 | + public GetContainerStatusResponse getContainerStatus( |
| 406 | + GetContainerStatusRequest request) throws YarnRemoteException { |
| 407 | + |
| 408 | + return null; |
| 409 | + } |
276 | 410 | } |
| 411 | + |
| 412 | + @SuppressWarnings("serial") |
| 413 | + private static class ContainerException extends YarnRemoteException { |
| 414 | + |
| 415 | + public ContainerException(String message) { |
| 416 | + super(message); |
| 417 | + } |
| 418 | + |
| 419 | + @Override |
| 420 | + public String getRemoteTrace() { |
| 421 | + return null; |
| 422 | + } |
| 423 | + |
| 424 | + @Override |
| 425 | + public YarnRemoteException getCause() { |
| 426 | + return null; |
| 427 | + } |
| 428 | + |
| 429 | + } |
| 430 | + |
277 | 431 | } |
0 commit comments