Skip to content

Commit c52a7cb

Browse files
committed
HDFS-4698. Provide client-side metrics for remote reads, local reads, and short-circuit reads. Contributed by Colin Patrick McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481121 13f79535-47bb-0310-9956-ffa450edef68
1 parent 5d40c8c commit c52a7cb

File tree

14 files changed

+411
-7
lines changed

14 files changed

+411
-7
lines changed

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,9 @@ Release 2.0.5-beta - UNRELEASED
844844
HDFS-4804. WARN when users set the block balanced preference percent below
845845
0.5 or above 1.0. (Stephen Chu via atm)
846846

847+
HDFS-4698. Provide client-side metrics for remote reads, local reads, and
848+
short-circuit reads. (Colin Patrick McCabe via atm)
849+
847850
OPTIMIZATIONS
848851

849852
BUG FIXES

hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,24 @@ int hdfsFileIsOpenForRead(hdfsFile file)
11971197
return (file->type == INPUT);
11981198
}
11991199

1200+
int hdfsFileGetReadStatistics(hdfsFile file,
1201+
struct hdfsReadStatistics **stats)
1202+
{
1203+
errno = ENOTSUP;
1204+
return -1;
1205+
}
1206+
1207+
int64_t hdfsReadStatisticsGetRemoteBytesRead(
1208+
const struct hdfsReadStatistics *stats)
1209+
{
1210+
return stats->totalBytesRead - stats->totalLocalBytesRead;
1211+
}
1212+
1213+
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
1214+
{
1215+
free(stats);
1216+
}
1217+
12001218
int hdfsFileIsOpenForWrite(hdfsFile file)
12011219
{
12021220
return (file->type == OUTPUT);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,15 @@ public interface BlockReader extends ByteBufferReadable {
7070
* filled or the next call will return EOF.
7171
*/
7272
int readAll(byte[] buf, int offset, int len) throws IOException;
73+
74+
/**
75+
* @return true only if this is a local read.
76+
*/
77+
boolean isLocal();
78+
79+
/**
80+
* @return true only if this is a short-circuit read.
81+
* All short-circuit reads are also local.
82+
*/
83+
boolean isShortCircuit();
7384
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,4 +531,14 @@ public int available() throws IOException {
531531
// We never do network I/O in BlockReaderLocal.
532532
return Integer.MAX_VALUE;
533533
}
534+
535+
@Override
536+
public boolean isLocal() {
537+
return true;
538+
}
539+
540+
@Override
541+
public boolean isShortCircuit() {
542+
return true;
543+
}
534544
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,4 +700,14 @@ public int available() throws IOException {
700700
// We never do network I/O in BlockReaderLocalLegacy.
701701
return Integer.MAX_VALUE;
702702
}
703+
704+
@Override
705+
public boolean isLocal() {
706+
return true;
707+
}
708+
709+
@Override
710+
public boolean isShortCircuit() {
711+
return true;
712+
}
703713
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,74 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
8181
private LocatedBlock currentLocatedBlock = null;
8282
private long pos = 0;
8383
private long blockEnd = -1;
84+
private final ReadStatistics readStatistics = new ReadStatistics();
8485

86+
public static class ReadStatistics {
87+
public ReadStatistics() {
88+
this.totalBytesRead = 0;
89+
this.totalLocalBytesRead = 0;
90+
this.totalShortCircuitBytesRead = 0;
91+
}
92+
93+
public ReadStatistics(ReadStatistics rhs) {
94+
this.totalBytesRead = rhs.getTotalBytesRead();
95+
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
96+
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
97+
}
98+
99+
/**
100+
* @return The total bytes read. This will always be at least as
101+
* high as the other numbers, since it includes all of them.
102+
*/
103+
public long getTotalBytesRead() {
104+
return totalBytesRead;
105+
}
106+
107+
/**
108+
* @return The total local bytes read. This will always be at least
109+
* as high as totalShortCircuitBytesRead, since all short-circuit
110+
* reads are also local.
111+
*/
112+
public long getTotalLocalBytesRead() {
113+
return totalLocalBytesRead;
114+
}
115+
116+
/**
117+
* @return The total short-circuit local bytes read.
118+
*/
119+
public long getTotalShortCircuitBytesRead() {
120+
return totalShortCircuitBytesRead;
121+
}
122+
123+
/**
124+
* @return The total number of bytes read which were not local.
125+
*/
126+
public long getRemoteBytesRead() {
127+
return totalBytesRead - totalLocalBytesRead;
128+
}
129+
130+
void addRemoteBytes(long amt) {
131+
this.totalBytesRead += amt;
132+
}
133+
134+
void addLocalBytes(long amt) {
135+
this.totalBytesRead += amt;
136+
this.totalLocalBytesRead += amt;
137+
}
138+
139+
void addShortCircuitBytes(long amt) {
140+
this.totalBytesRead += amt;
141+
this.totalLocalBytesRead += amt;
142+
this.totalShortCircuitBytesRead += amt;
143+
}
144+
145+
private long totalBytesRead;
146+
147+
private long totalLocalBytesRead;
148+
149+
private long totalShortCircuitBytesRead;
150+
}
151+
85152
private final FileInputStreamCache fileInputStreamCache;
86153

87154
/**
@@ -546,9 +613,25 @@ public synchronized int read() throws IOException {
546613
* strategy-agnostic.
547614
*/
548615
private interface ReaderStrategy {
549-
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException;
616+
public int doRead(BlockReader blockReader, int off, int len,
617+
ReadStatistics readStatistics) throws ChecksumException, IOException;
550618
}
551619

620+
private static void updateReadStatistics(ReadStatistics readStatistics,
621+
int nRead, BlockReader blockReader) {
622+
if (nRead <= 0) return;
623+
if (blockReader.isShortCircuit()) {
624+
readStatistics.totalBytesRead += nRead;
625+
readStatistics.totalLocalBytesRead += nRead;
626+
readStatistics.totalShortCircuitBytesRead += nRead;
627+
} else if (blockReader.isLocal()) {
628+
readStatistics.totalBytesRead += nRead;
629+
readStatistics.totalLocalBytesRead += nRead;
630+
} else {
631+
readStatistics.totalBytesRead += nRead;
632+
}
633+
}
634+
552635
/**
553636
* Used to read bytes into a byte[]
554637
*/
@@ -560,8 +643,11 @@ public ByteArrayStrategy(byte[] buf) {
560643
}
561644

562645
@Override
563-
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
564-
return blockReader.read(buf, off, len);
646+
public int doRead(BlockReader blockReader, int off, int len,
647+
ReadStatistics readStatistics) throws ChecksumException, IOException {
648+
int nRead = blockReader.read(buf, off, len);
649+
updateReadStatistics(readStatistics, nRead, blockReader);
650+
return nRead;
565651
}
566652
}
567653

@@ -575,13 +661,15 @@ private static class ByteBufferStrategy implements ReaderStrategy {
575661
}
576662

577663
@Override
578-
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
664+
public int doRead(BlockReader blockReader, int off, int len,
665+
ReadStatistics readStatistics) throws ChecksumException, IOException {
579666
int oldpos = buf.position();
580667
int oldlimit = buf.limit();
581668
boolean success = false;
582669
try {
583670
int ret = blockReader.read(buf);
584671
success = true;
672+
updateReadStatistics(readStatistics, ret, blockReader);
585673
return ret;
586674
} finally {
587675
if (!success) {
@@ -613,7 +701,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
613701
while (true) {
614702
// retry as many times as seekToNewSource allows.
615703
try {
616-
return reader.doRead(blockReader, off, len);
704+
return reader.doRead(blockReader, off, len, readStatistics);
617705
} catch ( ChecksumException ce ) {
618706
DFSClient.LOG.warn("Found Checksum error for "
619707
+ getCurrentBlock() + " from " + currentNode
@@ -1275,4 +1363,11 @@ static class DNAddrPair {
12751363
this.addr = addr;
12761364
}
12771365
}
1366+
1367+
/**
1368+
* Get statistics about the reads which this DFSInputStream has done.
1369+
*/
1370+
public synchronized ReadStatistics getReadStatistics() {
1371+
return new ReadStatistics(readStatistics);
1372+
}
12781373
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
4141
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
4242
import org.apache.hadoop.io.IOUtils;
43+
import org.apache.hadoop.net.NetUtils;
4344
import org.apache.hadoop.security.token.Token;
4445
import org.apache.hadoop.util.DataChecksum;
4546

@@ -78,6 +79,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
7879
* at the beginning so that the read can begin on a chunk boundary.
7980
*/
8081
private final long bytesNeededToFinish;
82+
83+
/**
84+
* True if we are reading from a local DataNode.
85+
*/
86+
private final boolean isLocal;
8187

8288
private boolean eos = false;
8389
private boolean sentStatusCode = false;
@@ -329,6 +335,9 @@ private RemoteBlockReader(String file, String bpid, long blockId,
329335
checksum.getChecksumSize() > 0? checksum : null,
330336
checksum.getBytesPerChecksum(),
331337
checksum.getChecksumSize());
338+
339+
this.isLocal = DFSClient.isLocalAddress(NetUtils.
340+
createSocketAddr(datanodeID.getXferAddr()));
332341

333342
this.peer = peer;
334343
this.datanodeID = datanodeID;
@@ -477,4 +486,14 @@ public int available() throws IOException {
477486
// to us without doing network I/O.
478487
return DFSClient.TCP_WINDOW_SIZE;
479488
}
489+
490+
@Override
491+
public boolean isLocal() {
492+
return isLocal;
493+
}
494+
495+
@Override
496+
public boolean isShortCircuit() {
497+
return false;
498+
}
480499
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
4545
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
4646
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
47+
import org.apache.hadoop.net.NetUtils;
4748
import org.apache.hadoop.security.token.Token;
4849
import org.apache.hadoop.util.DataChecksum;
4950

@@ -106,6 +107,11 @@ public class RemoteBlockReader2 implements BlockReader {
106107
*/
107108
private long bytesNeededToFinish;
108109

110+
/**
111+
* True if we are reading from a local DataNode.
112+
*/
113+
private final boolean isLocal;
114+
109115
private final boolean verifyChecksum;
110116

111117
private boolean sentStatusCode = false;
@@ -255,6 +261,8 @@ protected RemoteBlockReader2(String file, String bpid, long blockId,
255261
DataChecksum checksum, boolean verifyChecksum,
256262
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
257263
DatanodeID datanodeID, PeerCache peerCache) {
264+
this.isLocal = DFSClient.isLocalAddress(NetUtils.
265+
createSocketAddr(datanodeID.getXferAddr()));
258266
// Path is used only for printing block and file information in debug
259267
this.peer = peer;
260268
this.datanodeID = datanodeID;
@@ -431,4 +439,14 @@ public int available() throws IOException {
431439
// to us without doing network I/O.
432440
return DFSClient.TCP_WINDOW_SIZE;
433441
}
442+
443+
@Override
444+
public boolean isLocal() {
445+
return isLocal;
446+
}
447+
448+
@Override
449+
public boolean isShortCircuit() {
450+
return false;
451+
}
434452
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,14 @@ public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
6868
public long getVisibleLength() throws IOException {
6969
return ((DFSInputStream) in).getFileLength();
7070
}
71+
72+
/**
73+
* Get statistics about the reads which this DFSInputStream has done.
74+
* Note that because HdfsDataInputStream is buffered, these stats may
75+
* be higher than you would expect just by adding up the number of
76+
* bytes read through HdfsDataInputStream.
77+
*/
78+
public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
79+
return ((DFSInputStream) in).getReadStatistics();
80+
}
7181
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
214214
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
215215
offsetIntoBlock, amtToRead, true,
216216
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
217-
new DatanodeID(addr.getAddress().toString(),
217+
new DatanodeID(addr.getAddress().getHostAddress(),
218218
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
219219
null, null, false);
220220

0 commit comments

Comments
 (0)