Skip to content

Commit ce756da

Browse files
Sital KediaAndrew Or
Sital Kedia
authored and
Andrew Or
committed
[SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…
## What changes were proposed in this pull request? Profiling a Spark job spilling large amount of intermediate data we found that significant portion of time is being spent in DiskObjectWriter.updateBytesWritten function. Looking at the code, we see that the function is being called too frequently to update the number of bytes written to disk. We should reduce the frequency to avoid this. ## How was this patch tested? Tested by running the job on cluster and saw 20% CPU gain by this change. Author: Sital Kedia <[email protected]> Closes apache#13332 from sitalkedia/DiskObjectWriter.
1 parent 5bdbedf commit ce756da

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,7 @@ private[spark] class DiskBlockObjectWriter(
203203
numRecordsWritten += 1
204204
writeMetrics.incRecordsWritten(1)
205205

206-
// TODO: call updateBytesWritten() less frequently.
207-
if (numRecordsWritten % 32 == 0) {
206+
if (numRecordsWritten % 16384 == 0) {
208207
updateBytesWritten()
209208
}
210209
}

core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
5353
assert(writeMetrics.recordsWritten === 1)
5454
// Metrics don't update on every write
5555
assert(writeMetrics.bytesWritten == 0)
56-
// After 32 writes, metrics should update
57-
for (i <- 0 until 32) {
56+
// After 16384 writes, metrics should update
57+
for (i <- 0 until 16384) {
5858
writer.flush()
5959
writer.write(Long.box(i), Long.box(i))
6060
}
6161
assert(writeMetrics.bytesWritten > 0)
62-
assert(writeMetrics.recordsWritten === 33)
62+
assert(writeMetrics.recordsWritten === 16385)
6363
writer.commitAndClose()
6464
assert(file.length() == writeMetrics.bytesWritten)
6565
}
@@ -75,13 +75,13 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
7575
assert(writeMetrics.recordsWritten === 1)
7676
// Metrics don't update on every write
7777
assert(writeMetrics.bytesWritten == 0)
78-
// After 32 writes, metrics should update
79-
for (i <- 0 until 32) {
78+
// After 16384 writes, metrics should update
79+
for (i <- 0 until 16384) {
8080
writer.flush()
8181
writer.write(Long.box(i), Long.box(i))
8282
}
8383
assert(writeMetrics.bytesWritten > 0)
84-
assert(writeMetrics.recordsWritten === 33)
84+
assert(writeMetrics.recordsWritten === 16385)
8585
writer.revertPartialWritesAndClose()
8686
assert(writeMetrics.bytesWritten == 0)
8787
assert(writeMetrics.recordsWritten == 0)

0 commit comments

Comments
 (0)