|
17 | 17 | */ |
18 | 18 | package org.apache.hadoop.hdfs.nfs.nfs3; |
19 | 19 |
|
| 20 | +import static org.junit.Assert.assertTrue; |
| 21 | +import static org.junit.Assert.fail; |
| 22 | + |
20 | 23 | import java.io.IOException; |
| 24 | +import java.net.InetAddress; |
21 | 25 | import java.nio.ByteBuffer; |
| 26 | +import java.util.ArrayList; |
| 27 | +import java.util.Arrays; |
| 28 | +import java.util.List; |
| 29 | +import java.util.concurrent.ConcurrentMap; |
22 | 30 | import java.util.concurrent.ConcurrentNavigableMap; |
23 | 31 |
|
24 | 32 | import junit.framework.Assert; |
25 | 33 |
|
26 | 34 | import org.apache.hadoop.hdfs.DFSClient; |
| 35 | +import org.apache.hadoop.hdfs.HdfsConfiguration; |
| 36 | +import org.apache.hadoop.hdfs.MiniDFSCluster; |
27 | 37 | import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; |
28 | 38 | import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; |
29 | 39 | import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; |
| 40 | +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| 41 | +import org.apache.hadoop.hdfs.server.namenode.NameNode; |
30 | 42 | import org.apache.hadoop.nfs.nfs3.FileHandle; |
31 | 43 | import org.apache.hadoop.nfs.nfs3.IdUserGroup; |
| 44 | +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; |
32 | 45 | import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; |
33 | 46 | import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; |
| 47 | +import org.apache.hadoop.nfs.nfs3.request.CREATE3Request; |
| 48 | +import org.apache.hadoop.nfs.nfs3.request.READ3Request; |
| 49 | +import org.apache.hadoop.nfs.nfs3.request.SetAttr3; |
34 | 50 | import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; |
| 51 | +import org.apache.hadoop.nfs.nfs3.response.CREATE3Response; |
| 52 | +import org.apache.hadoop.nfs.nfs3.response.READ3Response; |
| 53 | +import org.apache.hadoop.oncrpc.XDR; |
| 54 | +import org.apache.hadoop.oncrpc.security.SecurityHandler; |
35 | 55 | import org.junit.Test; |
36 | 56 | import org.mockito.Mockito; |
37 | 57 |
|
@@ -105,7 +125,7 @@ public void testAlterWriteRequest() throws IOException { |
105 | 125 | Assert.assertTrue(limit - position == 1); |
106 | 126 | Assert.assertTrue(appendedData.get(position) == (byte) 19); |
107 | 127 | } |
108 | | - |
| 128 | + |
109 | 129 | @Test |
110 | 130 | // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which |
111 | 131 | // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, |
@@ -162,4 +182,117 @@ public void testCheckCommit() throws IOException { |
162 | 182 | ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); |
163 | 183 | Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); |
164 | 184 | } |
| 185 | + |
| 186 | + private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) |
| 187 | + throws InterruptedException { |
| 188 | + int waitedTime = 0; |
| 189 | + ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager() |
| 190 | + .getOpenFileMap(); |
| 191 | + OpenFileCtx ctx = openFileMap.get(handle); |
| 192 | + assertTrue(ctx != null); |
| 193 | + do { |
| 194 | + Thread.sleep(3000); |
| 195 | + waitedTime += 3000; |
| 196 | + if (ctx.getPendingWritesForTest().size() == 0) { |
| 197 | + return; |
| 198 | + } |
| 199 | + } while (waitedTime < maxWaitTime); |
| 200 | + |
| 201 | + fail("Write can't finish."); |
| 202 | + } |
| 203 | + |
| 204 | + @Test |
| 205 | + public void testWriteStableHow() throws IOException, InterruptedException { |
| 206 | + HdfsConfiguration config = new HdfsConfiguration(); |
| 207 | + DFSClient client = null; |
| 208 | + MiniDFSCluster cluster = null; |
| 209 | + RpcProgramNfs3 nfsd; |
| 210 | + SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class); |
| 211 | + Mockito.when(securityHandler.getUser()).thenReturn( |
| 212 | + System.getProperty("user.name")); |
| 213 | + |
| 214 | + try { |
| 215 | + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); |
| 216 | + cluster.waitActive(); |
| 217 | + client = new DFSClient(NameNode.getAddress(config), config); |
| 218 | + |
| 219 | + // Start nfs |
| 220 | + List<String> exports = new ArrayList<String>(); |
| 221 | + exports.add("/"); |
| 222 | + Nfs3 nfs3 = new Nfs3(exports, config); |
| 223 | + nfs3.start(false); |
| 224 | + nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); |
| 225 | + |
| 226 | + HdfsFileStatus status = client.getFileInfo("/"); |
| 227 | + FileHandle rootHandle = new FileHandle(status.getFileId()); |
| 228 | + // Create file1 |
| 229 | + CREATE3Request createReq = new CREATE3Request(rootHandle, "file1", |
| 230 | + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); |
| 231 | + XDR createXdr = new XDR(); |
| 232 | + createReq.serialize(createXdr); |
| 233 | + CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(), |
| 234 | + securityHandler, InetAddress.getLocalHost()); |
| 235 | + FileHandle handle = createRsp.getObjHandle(); |
| 236 | + |
| 237 | + // Test DATA_SYNC |
| 238 | + byte[] buffer = new byte[10]; |
| 239 | + for (int i = 0; i < 10; i++) { |
| 240 | + buffer[i] = (byte) i; |
| 241 | + } |
| 242 | + WRITE3Request writeReq = new WRITE3Request(handle, 0, 10, |
| 243 | + WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); |
| 244 | + XDR writeXdr = new XDR(); |
| 245 | + writeReq.serialize(writeXdr); |
| 246 | + nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler, |
| 247 | + InetAddress.getLocalHost()); |
| 248 | + |
| 249 | + waitWrite(nfsd, handle, 60000); |
| 250 | + |
| 251 | + // Readback |
| 252 | + READ3Request readReq = new READ3Request(handle, 0, 10); |
| 253 | + XDR readXdr = new XDR(); |
| 254 | + readReq.serialize(readXdr); |
| 255 | + READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(), |
| 256 | + securityHandler, InetAddress.getLocalHost()); |
| 257 | + |
| 258 | + assertTrue(Arrays.equals(buffer, readRsp.getData().array())); |
| 259 | + |
| 260 | + // Test FILE_SYNC |
| 261 | + |
| 262 | + // Create file2 |
| 263 | + CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2", |
| 264 | + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); |
| 265 | + XDR createXdr2 = new XDR(); |
| 266 | + createReq2.serialize(createXdr2); |
| 267 | + CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(), |
| 268 | + securityHandler, InetAddress.getLocalHost()); |
| 269 | + FileHandle handle2 = createRsp2.getObjHandle(); |
| 270 | + |
| 271 | + WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10, |
| 272 | + WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer)); |
| 273 | + XDR writeXdr2 = new XDR(); |
| 274 | + writeReq2.serialize(writeXdr2); |
| 275 | + nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler, |
| 276 | + InetAddress.getLocalHost()); |
| 277 | + |
| 278 | + waitWrite(nfsd, handle2, 60000); |
| 279 | + |
| 280 | + // Readback |
| 281 | + READ3Request readReq2 = new READ3Request(handle2, 0, 10); |
| 282 | + XDR readXdr2 = new XDR(); |
| 283 | + readReq2.serialize(readXdr2); |
| 284 | + READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(), |
| 285 | + securityHandler, InetAddress.getLocalHost()); |
| 286 | + |
| 287 | + assertTrue(Arrays.equals(buffer, readRsp2.getData().array())); |
| 288 | + // FILE_SYNC should sync the file size |
| 289 | + status = client.getFileInfo("/file2"); |
| 290 | + assertTrue(status.getLen() == 10); |
| 291 | + |
| 292 | + } finally { |
| 293 | + if (cluster != null) { |
| 294 | + cluster.shutdown(); |
| 295 | + } |
| 296 | + } |
| 297 | + } |
165 | 298 | } |
0 commit comments