|
22 | 22 |
|
23 | 23 | import java.util.Queue; |
24 | 24 |
|
| 25 | +import org.apache.hadoop.conf.Configuration; |
| 26 | +import org.apache.hadoop.fs.Path; |
| 27 | +import org.apache.hadoop.hdfs.DFSConfigKeys; |
25 | 28 | import org.apache.hadoop.hdfs.DFSTestUtil; |
| 29 | +import org.apache.hadoop.hdfs.DistributedFileSystem; |
| 30 | +import org.apache.hadoop.hdfs.MiniDFSCluster; |
| 31 | +import org.apache.hadoop.hdfs.MiniDFSNNTopology; |
26 | 32 | import org.apache.hadoop.hdfs.protocol.Block; |
| 33 | +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| 34 | +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; |
27 | 35 | import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; |
28 | 36 | import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; |
| 37 | +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; |
29 | 38 | import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; |
30 | 39 | import org.junit.Test; |
31 | 40 |
|
@@ -67,4 +76,41 @@ public void testQueues() { |
67 | 76 | assertNull(msgs.takeBlockQueue(block1Gs1)); |
68 | 77 | assertEquals(0, msgs.count()); |
69 | 78 | } |
| 79 | + |
| 80 | + @Test |
| 81 | + public void testPendingDataNodeMessagesWithEC() throws Exception { |
| 82 | + ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getPolicies() |
| 83 | + .get(3); |
| 84 | + Path dirPath = new Path("/testPendingDataNodeMessagesWithEC"); |
| 85 | + Configuration conf = new Configuration(); |
| 86 | + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60000); |
| 87 | + |
| 88 | + int numDn = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); |
| 89 | + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| 90 | + .numDataNodes(numDn).nnTopology(MiniDFSNNTopology.simpleHATopology()) |
| 91 | + .build(); |
| 92 | + try { |
| 93 | + cluster.transitionToActive(0); |
| 94 | + |
| 95 | + DistributedFileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); |
| 96 | + fs.enableErasureCodingPolicy(ecPolicy.getName()); |
| 97 | + fs.mkdirs(dirPath); |
| 98 | + fs.setErasureCodingPolicy(dirPath, ecPolicy.getName()); |
| 99 | + |
| 100 | + DFSTestUtil.createFile(fs, new Path(dirPath, "file"), |
| 101 | + ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(), (short) 1, 0); |
| 102 | + |
| 103 | + cluster.getNameNode(0).getRpcServer().rollEditLog(); |
| 104 | + cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); |
| 105 | + |
| 106 | + // PendingDataNodeMessages datanode message queue should be empty after |
| 107 | + // processing IBR |
| 108 | + int pendingIBRMsg = cluster.getNameNode(1).getNamesystem() |
| 109 | + .getBlockManager().getPendingDataNodeMessageCount(); |
| 110 | + assertEquals("All DN message should processed after tail edits", 0, |
| 111 | + pendingIBRMsg); |
| 112 | + } finally { |
| 113 | + cluster.shutdown(); |
| 114 | + } |
| 115 | + } |
70 | 116 | } |
0 commit comments