|
28 | 28 | import java.io.File; |
29 | 29 | import java.io.IOException; |
30 | 30 | import java.net.InetSocketAddress; |
| 31 | +import java.util.ArrayList; |
31 | 32 | import java.util.Collections; |
32 | 33 | import java.util.List; |
33 | 34 | import java.util.Map; |
@@ -99,10 +100,10 @@ public class TestBPOfferService { |
99 | 100 | private DatanodeProtocolClientSideTranslatorPB mockNN1; |
100 | 101 | private DatanodeProtocolClientSideTranslatorPB mockNN2; |
101 | 102 | private final NNHAStatusHeartbeat[] mockHaStatuses = |
102 | | - new NNHAStatusHeartbeat[2]; |
| 103 | + new NNHAStatusHeartbeat[3]; |
103 | 104 | private final DatanodeCommand[][] datanodeCommands = |
104 | | - new DatanodeCommand[2][0]; |
105 | | - private final int[] heartbeatCounts = new int[2]; |
| 105 | + new DatanodeCommand[3][0]; |
| 106 | + private final int[] heartbeatCounts = new int[3]; |
106 | 107 | private DataNode mockDn; |
107 | 108 | private FsDatasetSpi<?> mockFSDataset; |
108 | 109 |
|
@@ -864,4 +865,74 @@ public void testNNHAStateUpdateFromVersionRequest() throws Exception { |
864 | 865 | assertNotNull(bpos.getActiveNN()); |
865 | 866 |
|
866 | 867 | } |
| 868 | + |
| 869 | + @Test |
| 870 | + public void testRefreshNameNodes() throws Exception { |
| 871 | + |
| 872 | + BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2); |
| 873 | + |
| 874 | + bpos.start(); |
| 875 | + try { |
| 876 | + waitForBothActors(bpos); |
| 877 | + |
| 878 | + // The DN should have register to both NNs. |
| 879 | + Mockito.verify(mockNN1) |
| 880 | + .registerDatanode(Mockito.any(DatanodeRegistration.class)); |
| 881 | + Mockito.verify(mockNN2) |
| 882 | + .registerDatanode(Mockito.any(DatanodeRegistration.class)); |
| 883 | + |
| 884 | + // Should get block reports from both NNs |
| 885 | + waitForBlockReport(mockNN1); |
| 886 | + waitForBlockReport(mockNN2); |
| 887 | + |
| 888 | + // When we receive a block, it should report it to both NNs |
| 889 | + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); |
| 890 | + |
| 891 | + ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, |
| 892 | + mockNN1); |
| 893 | + assertEquals(1, ret.length); |
| 894 | + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); |
| 895 | + |
| 896 | + ret = waitForBlockReceived(FAKE_BLOCK, mockNN2); |
| 897 | + assertEquals(1, ret.length); |
| 898 | + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); |
| 899 | + |
| 900 | + // add new standby |
| 901 | + DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2); |
| 902 | + Mockito.doReturn(mockNN3).when(mockDn) |
| 903 | + .connectToNN(Mockito.eq(new InetSocketAddress(2))); |
| 904 | + |
| 905 | + ArrayList<InetSocketAddress> addrs = new ArrayList<>(); |
| 906 | + ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>( |
| 907 | + addrs.size()); |
| 908 | + // mockNN1 |
| 909 | + addrs.add(new InetSocketAddress(0)); |
| 910 | + lifelineAddrs.add(null); |
| 911 | + // mockNN3 |
| 912 | + addrs.add(new InetSocketAddress(2)); |
| 913 | + lifelineAddrs.add(null); |
| 914 | + |
| 915 | + bpos.refreshNNList(addrs, lifelineAddrs); |
| 916 | + |
| 917 | + assertEquals(2, bpos.getBPServiceActors().size()); |
| 918 | + // wait for handshake to run |
| 919 | + Thread.sleep(1000); |
| 920 | + |
| 921 | + // verify new NN registered |
| 922 | + Mockito.verify(mockNN3) |
| 923 | + .registerDatanode(Mockito.any(DatanodeRegistration.class)); |
| 924 | + |
| 925 | + // When we receive a block, it should report it to both NNs |
| 926 | + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); |
| 927 | + |
| 928 | + // veridfy new NN recieved block report |
| 929 | + ret = waitForBlockReceived(FAKE_BLOCK, mockNN3); |
| 930 | + assertEquals(1, ret.length); |
| 931 | + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); |
| 932 | + |
| 933 | + } finally { |
| 934 | + bpos.stop(); |
| 935 | + bpos.join(); |
| 936 | + } |
| 937 | + } |
867 | 938 | } |
0 commit comments