Skip to content

Commit dbb4a8b

Browse files
committed
HDFS-3343. Improve metrics for DN read latency. Contributed by Andrew Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356928 13f79535-47bb-0310-9956-ffa450edef68
1 parent 85b23b5 commit dbb4a8b

File tree

5 files changed

+97
-13
lines changed

5 files changed

+97
-13
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import org.apache.hadoop.classification.InterfaceAudience;
3333
import org.apache.hadoop.classification.InterfaceStability;
34+
import org.apache.hadoop.metrics2.lib.MutableRate;
35+
import org.apache.hadoop.util.Progressable;
3436

3537
/**
3638
* This implements an output stream that can have a timeout while writing.
@@ -166,14 +168,20 @@ public void waitForWritable() throws IOException {
166168

167169
/**
168170
* Transfers data from FileChannel using
169-
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
171+
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
172+
* Updates <code>waitForWritableTime</code> and <code>transferToTime</code>
173+
* with the time spent blocked on the network and the time spent transferring
174+
* data from disk to network respectively.
170175
*
171176
* Similar to readFully(), this waits till requested amount of
172177
* data is transfered.
173178
*
174179
* @param fileCh FileChannel to transfer data from.
175180
* @param position position within the channel where the transfer begins
176181
* @param count number of bytes to transfer.
182+
* @param waitForWritableTime updated by the nanoseconds spent waiting for
183+
* the socket to become writable
184+
* @param transferTime updated by the nanoseconds spent transferring data
177185
*
178186
* @throws EOFException
179187
* If end of input file is reached before requested number of
@@ -186,9 +194,11 @@ public void waitForWritable() throws IOException {
186194
* @throws IOException Includes any exception thrown by
187195
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
188196
*/
189-
public void transferToFully(FileChannel fileCh, long position, int count)
190-
throws IOException {
191-
197+
public void transferToFully(FileChannel fileCh, long position, int count,
198+
MutableRate waitForWritableTime,
199+
MutableRate transferToTime) throws IOException {
200+
long waitTime = 0;
201+
long transferTime = 0;
192202
while (count > 0) {
193203
/*
194204
* Ideally we should wait after transferTo returns 0. But because of
@@ -200,7 +210,10 @@ public void transferToFully(FileChannel fileCh, long position, int count)
200210
*
201211
* Once we move to JAVA SE 7, wait should be moved to correct place.
202212
*/
213+
long start = System.nanoTime();
203214
waitForWritable();
215+
long wait = System.nanoTime();
216+
204217
int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
205218

206219
if (nTransfered == 0) {
@@ -219,6 +232,26 @@ public void transferToFully(FileChannel fileCh, long position, int count)
219232
position += nTransfered;
220233
count -= nTransfered;
221234
}
235+
long transfer = System.nanoTime();
236+
waitTime += wait - start;
237+
transferTime += transfer - wait;
238+
}
239+
240+
if (waitForWritableTime != null) {
241+
waitForWritableTime.add(waitTime);
242+
}
243+
if (transferToTime != null) {
244+
transferToTime.add(transferTime);
222245
}
223-
}
246+
}
247+
248+
/**
249+
* Call
250+
* {@link #transferToFully(FileChannel, long, int, MutableRate, MutableRate)}
251+
* with null <code>waitForWritableTime</code> and <code>transferToTime</code>
252+
*/
253+
public void transferToFully(FileChannel fileCh, long position, int count)
254+
throws IOException {
255+
transferToFully(fileCh, position, count, null, null);
256+
}
224257
}

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ Branch-2 ( Unreleased changes )
266266
HDFS-3475. Make the replication monitor multipliers configurable.
267267
(harsh via eli)
268268

269+
HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
270+
269271
OPTIMIZATIONS
270272

271273
HDFS-2982. Startup performance suffers when there are many edit log

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.BufferedInputStream;
2121
import java.io.DataInputStream;
2222
import java.io.DataOutputStream;
23+
import java.io.EOFException;
2324
import java.io.FileDescriptor;
2425
import java.io.FileInputStream;
2526
import java.io.FileNotFoundException;
@@ -36,6 +37,7 @@
3637
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
3738
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
3839
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
40+
import org.apache.hadoop.hdfs.server.common.Util;
3941
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
4042
import org.apache.hadoop.io.IOUtils;
4143
import org.apache.hadoop.io.ReadaheadPool;
@@ -142,6 +144,7 @@ class BlockSender implements java.io.Closeable {
142144
/** Format used to print client trace log messages */
143145
private final String clientTraceFmt;
144146
private volatile ChunkChecksum lastChunkChecksum = null;
147+
private DataNode datanode;
145148

146149
/** The file descriptor of the block being sent */
147150
private FileDescriptor blockInFd;
@@ -184,6 +187,7 @@ class BlockSender implements java.io.Closeable {
184187
this.clientTraceFmt = clientTraceFmt;
185188
this.readaheadLength = datanode.getDnConf().readaheadLength;
186189
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
190+
this.datanode = datanode;
187191

188192
final Replica replica;
189193
final long replicaVisibleLength;
@@ -478,9 +482,11 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
478482
SocketOutputStream sockOut = (SocketOutputStream)out;
479483
sockOut.write(buf, 0, dataOff); // First write checksum
480484

481-
// no need to flush. since we know out is not a buffered stream.
482-
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
483-
blockInPosition, dataLen);
485+
// no need to flush since we know out is not a buffered stream
486+
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
487+
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
488+
datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
489+
datanode.metrics.getSendDataPacketTransferNanos());
484490
blockInPosition += dataLen;
485491
} else {
486492
// normal transfer

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public class DataNodeMetrics {
7575
@Metric MutableRate blockReports;
7676

7777
@Metric MutableRate fsync;
78+
79+
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
80+
@Metric MutableRate sendDataPacketTransferNanos;
7881

7982
final MetricsRegistry registry = new MetricsRegistry("datanode");
8083
final String name;
@@ -183,4 +186,12 @@ public void incrVolumeFailures() {
183186
public void incrBlocksGetLocalPathInfo() {
184187
blocksGetLocalPathInfo.incr();
185188
}
189+
190+
public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
191+
return sendDataPacketBlockedOnNetworkNanos;
192+
}
193+
194+
public MutableRate getSendDataPacketTransferNanos() {
195+
return sendDataPacketTransferNanos;
196+
}
186197
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.datanode;
1919

20+
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
21+
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
22+
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
23+
import static org.junit.Assert.*;
24+
2025
import java.util.List;
2126

27+
import org.apache.hadoop.conf.Configuration;
2228
import org.apache.hadoop.fs.FileSystem;
2329
import org.apache.hadoop.fs.Path;
2430
import org.apache.hadoop.hdfs.DFSTestUtil;
2531
import org.apache.hadoop.hdfs.HdfsConfiguration;
2632
import org.apache.hadoop.hdfs.MiniDFSCluster;
27-
import org.apache.hadoop.conf.Configuration;
2833
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
29-
import static org.apache.hadoop.test.MetricsAsserts.*;
34+
import org.junit.Test;
3035

31-
import junit.framework.TestCase;
32-
33-
public class TestDataNodeMetrics extends TestCase {
36+
public class TestDataNodeMetrics {
37+
38+
MiniDFSCluster cluster = null;
39+
FileSystem fs = null;
3440

41+
@Test
3542
public void testDataNodeMetrics() throws Exception {
3643
Configuration conf = new HdfsConfiguration();
3744
SimulatedFSDataset.setFactory(conf);
@@ -50,4 +57,29 @@ public void testDataNodeMetrics() throws Exception {
5057
if (cluster != null) {cluster.shutdown();}
5158
}
5259
}
60+
61+
@Test
62+
public void testSendDataPacket() throws Exception {
63+
Configuration conf = new HdfsConfiguration();
64+
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
65+
try {
66+
FileSystem fs = cluster.getFileSystem();
67+
// Create and read a 1 byte file
68+
Path tmpfile = new Path("/tmp.txt");
69+
DFSTestUtil.createFile(fs, tmpfile,
70+
(long)1, (short)1, 1L);
71+
DFSTestUtil.readFile(fs, tmpfile);
72+
List<DataNode> datanodes = cluster.getDataNodes();
73+
assertEquals(datanodes.size(), 1);
74+
DataNode datanode = datanodes.get(0);
75+
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
76+
77+
// Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
78+
// signaling the end of the block
79+
assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
80+
assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
81+
} finally {
82+
if (cluster != null) {cluster.shutdown();}
83+
}
84+
}
5385
}

0 commit comments

Comments
 (0)