Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3956,7 +3956,7 @@ public static class StatisticsData {
/**
* Add another StatisticsData object to this one.
*/
void add(StatisticsData other) {
public void add(StatisticsData other) {
this.bytesRead += other.bytesRead;
this.bytesWritten += other.bytesWritten;
this.readOps += other.readOps;
Expand Down Expand Up @@ -4032,6 +4032,15 @@ public long getBytesReadDistanceOfFiveOrLarger() {
return bytesReadDistanceOfFiveOrLarger;
}

public boolean hasNetworkDistanceData() {
return (
bytesReadLocalHost > 0 ||
bytesReadDistanceOfOneOrTwo > 0 ||
bytesReadDistanceOfThreeOrFour > 0 ||
bytesReadDistanceOfFiveOrLarger > 0
);
}

public long getBytesReadErasureCoded() {
return bytesReadErasureCoded;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,10 @@ class FileSystemStatisticUpdater {
private Counters.Counter readBytesCounter, writeBytesCounter,
readOpsCounter, largeReadOpsCounter, writeOpsCounter,
readBytesEcCounter;
private Counters.Counter localHostReadBytesCounter,
localRackReadBytesCounter, firstDegreeRemoteRackReadBytesCounter,
secondOrMoreDegreeRemoteRackReadBytesCounter;

private String scheme;
FileSystemStatisticUpdater(List<FileSystem.Statistics> stats, String scheme) {
this.stats = stats;
Expand Down Expand Up @@ -1160,27 +1164,42 @@ void updateCounters() {
readBytesEcCounter =
counters.findCounter(scheme, FileSystemCounter.BYTES_READ_EC);
}
long readBytes = 0;
long writeBytes = 0;
long readOps = 0;
long largeReadOps = 0;
long writeOps = 0;
long readBytesEC = 0;

Statistics.StatisticsData data = new Statistics.StatisticsData();
for (FileSystem.Statistics stat: stats) {
readBytes = readBytes + stat.getBytesRead();
writeBytes = writeBytes + stat.getBytesWritten();
readOps = readOps + stat.getReadOps();
largeReadOps = largeReadOps + stat.getLargeReadOps();
writeOps = writeOps + stat.getWriteOps();
readBytesEC = readBytesEC + stat.getBytesReadErasureCoded();
data.add(stat.getData());
}
readBytesCounter.setValue(readBytes);
writeBytesCounter.setValue(writeBytes);
readOpsCounter.setValue(readOps);
largeReadOpsCounter.setValue(largeReadOps);
writeOpsCounter.setValue(writeOps);

readBytesCounter.setValue(data.getBytesRead());
writeBytesCounter.setValue(data.getBytesWritten());
readOpsCounter.setValue(data.getReadOps());
largeReadOpsCounter.setValue(data.getLargeReadOps());
writeOpsCounter.setValue(data.getWriteOps());
if (readBytesEcCounter != null) {
readBytesEcCounter.setValue(readBytesEC);
readBytesEcCounter.setValue(data.getBytesReadErasureCoded());
}

if (data.hasNetworkDistanceData()) {
if (localHostReadBytesCounter == null) {
localHostReadBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ_LOCAL_HOST);
}
if (localRackReadBytesCounter == null) {
localRackReadBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ_LOCAL_RACK);
}
if (firstDegreeRemoteRackReadBytesCounter == null) {
firstDegreeRemoteRackReadBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ_FIRST_DEGREE_REMOTE_RACK);
}
if (secondOrMoreDegreeRemoteRackReadBytesCounter == null) {
secondOrMoreDegreeRemoteRackReadBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ_SECOND_OR_MORE_DEGREE_REMOTE_RACK);
}
localHostReadBytesCounter.setValue(data.getBytesReadLocalHost());
localRackReadBytesCounter.setValue(data.getBytesReadDistanceOfOneOrTwo());
firstDegreeRemoteRackReadBytesCounter.setValue(data.getBytesReadDistanceOfThreeOrFour());
secondOrMoreDegreeRemoteRackReadBytesCounter.setValue(data.getBytesReadDistanceOfFiveOrLarger());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
@InterfaceAudience.Private
public enum FileSystemCounter {
BYTES_READ,
BYTES_READ_LOCAL_HOST,
BYTES_READ_LOCAL_RACK,
BYTES_READ_FIRST_DEGREE_REMOTE_RACK,
BYTES_READ_SECOND_OR_MORE_DEGREE_REMOTE_RACK,
BYTES_WRITTEN,
READ_OPS,
LARGE_READ_OPS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ READ_OPS.name= Number of read operations
LARGE_READ_OPS.name= Number of large read operations
WRITE_OPS.name= Number of write operations
BYTES_READ_EC.name= Number of bytes read erasure-coded
BYTES_READ_LOCAL_HOST.name= Number of bytes read from the local host
BYTES_READ_LOCAL_RACK.name= Number of bytes read from the local rack
BYTES_READ_FIRST_DEGREE_REMOTE_RACK.name= Number of bytes read from a remote rack of first degree
BYTES_READ_SECOND_OR_MORE_DEGREE_REMOTE_RACK.name= Number of bytes read from a remote rack of second degree or more
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void testCountersIncrement() {
static final String FS_SCHEME = "HDFS";
static final FileSystemCounter FS_COUNTER = FileSystemCounter.BYTES_READ;
static final long FS_COUNTER_VALUE = 10;
static final FileSystemCounter FS_LOCAL_RACK_COUNTER = FileSystemCounter.BYTES_READ_LOCAL_RACK;
static final long FS_LOCAL_RACK_COUNTER_VALUE = 5;

private void testMaxCounters(final Counters counters) {
LOG.info("counters max="+ Limits.getCountersMax());
Expand Down Expand Up @@ -131,13 +133,16 @@ public void run() {
private void setExpected(Counters counters) {
counters.findCounter(FRAMEWORK_COUNTER).setValue(FRAMEWORK_COUNTER_VALUE);
counters.findCounter(FS_SCHEME, FS_COUNTER).setValue(FS_COUNTER_VALUE);
counters.findCounter(FS_SCHEME, FS_LOCAL_RACK_COUNTER).setValue(FS_LOCAL_RACK_COUNTER_VALUE);
}

private void checkExpected(Counters counters) {
assertEquals(FRAMEWORK_COUNTER_VALUE,
counters.findCounter(FRAMEWORK_COUNTER).getValue());
assertEquals(FS_COUNTER_VALUE,
counters.findCounter(FS_SCHEME, FS_COUNTER).getValue());
assertEquals(FS_LOCAL_RACK_COUNTER_VALUE,
counters.findCounter(FS_SCHEME, FS_LOCAL_RACK_COUNTER).getValue());
}

private void shouldThrow(Class<? extends Exception> ecls, Runnable runnable) {
Expand Down