Skip to content

Commit a79a9f9

Browse files
Kostas Sakellispwendell
Kostas Sakellis
authored andcommitted
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method. A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other. Author: Kostas Sakellis <[email protected]> Closes apache#3120 from ksakellis/kostas-spark-4092 and squashes the following commits: 54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics a2a36d4 [Kostas Sakellis] CR feedback 5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
1 parent 96c2c71 commit a79a9f9

File tree

10 files changed

+270
-102
lines changed

10 files changed

+270
-102
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4444
blockManager.get(key) match {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
47-
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
47+
val inputMetrics = blockResult.inputMetrics
48+
val existingMetrics = context.taskMetrics
49+
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50+
existingMetrics.addBytesRead(inputMetrics.bytesRead)
51+
4852
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
4953

5054
case None =>

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ private[spark] class Executor(
379379
if (!taskRunner.attemptedTask.isEmpty) {
380380
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
381381
metrics.updateShuffleReadMetrics
382+
metrics.updateInputMetrics()
382383
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
383384
if (isLocal) {
384385
// JobProgressListener will hold an reference of it during

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717

1818
package org.apache.spark.executor
1919

20+
import java.util.concurrent.atomic.AtomicLong
21+
22+
import org.apache.spark.executor.DataReadMethod
23+
import org.apache.spark.executor.DataReadMethod.DataReadMethod
24+
2025
import scala.collection.mutable.ArrayBuffer
2126

2227
import org.apache.spark.annotation.DeveloperApi
@@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
8085
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
8186
* are stored here.
8287
*/
83-
var inputMetrics: Option[InputMetrics] = None
88+
private var _inputMetrics: Option[InputMetrics] = None
89+
90+
def inputMetrics = _inputMetrics
91+
92+
/**
93+
* This should only be used when recreating TaskMetrics, not when updating input metrics in
94+
* executors
95+
*/
96+
private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
97+
_inputMetrics = inputMetrics
98+
}
8499

85100
/**
86101
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
@@ -133,6 +148,30 @@ class TaskMetrics extends Serializable {
133148
readMetrics
134149
}
135150

151+
/**
152+
* Returns the input metrics object that the task should use. Currently, if
153+
* there exists an input metric with the same readMethod, we return that one
154+
* so the caller can accumulate bytes read. If the readMethod is different
155+
* than previously seen by this task, we return a new InputMetric but don't
156+
* record it.
157+
*
158+
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
159+
* we can store all the different inputMetrics (one per readMethod).
160+
*/
161+
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
162+
InputMetrics =synchronized {
163+
_inputMetrics match {
164+
case None =>
165+
val metrics = new InputMetrics(readMethod)
166+
_inputMetrics = Some(metrics)
167+
metrics
168+
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
169+
metrics
170+
case Some(InputMetrics(method)) =>
171+
new InputMetrics(readMethod)
172+
}
173+
}
174+
136175
/**
137176
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
138177
*/
@@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
146185
}
147186
_shuffleReadMetrics = Some(merged)
148187
}
188+
189+
private[spark] def updateInputMetrics() = synchronized {
190+
inputMetrics.foreach(_.updateBytesRead())
191+
}
149192
}
150193

151194
private[spark] object TaskMetrics {
@@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
179222
*/
180223
@DeveloperApi
181224
case class InputMetrics(readMethod: DataReadMethod.Value) {
225+
226+
private val _bytesRead: AtomicLong = new AtomicLong()
227+
182228
/**
183229
* Total bytes read.
184230
*/
185-
var bytesRead: Long = 0L
231+
def bytesRead: Long = _bytesRead.get()
232+
@volatile @transient var bytesReadCallback: Option[() => Long] = None
233+
234+
/**
235+
* Adds additional bytes read for this read method.
236+
*/
237+
def addBytesRead(bytes: Long) = {
238+
_bytesRead.addAndGet(bytes)
239+
}
240+
241+
/**
242+
* Invoke the bytesReadCallback and mutate bytesRead.
243+
*/
244+
def updateBytesRead() {
245+
bytesReadCallback.foreach { c =>
246+
_bytesRead.set(c())
247+
}
248+
}
249+
250+
/**
251+
* Register a function that can be called to get up-to-date information on how many bytes the task
252+
* has read from an input source.
253+
*/
254+
def setBytesReadCallback(f: Option[() => Long]) {
255+
bytesReadCallback = f
256+
}
186257
}
187258

188259
/**

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,19 @@ class HadoopRDD[K, V](
213213
logInfo("Input split: " + split.inputSplit)
214214
val jobConf = getJobConf()
215215

216-
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
216+
val inputMetrics = context.taskMetrics
217+
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
218+
217219
// Find a function that will return the FileSystem bytes read by this thread. Do this before
218220
// creating RecordReader, because RecordReader's constructor might read some bytes
219-
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
220-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
221-
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
222-
} else {
223-
None
224-
}
225-
if (bytesReadCallback.isDefined) {
226-
context.taskMetrics.inputMetrics = Some(inputMetrics)
227-
}
221+
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
222+
split.inputSplit.value match {
223+
case split: FileSplit =>
224+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
225+
case _ => None
226+
}
227+
)
228+
inputMetrics.setBytesReadCallback(bytesReadCallback)
228229

229230
var reader: RecordReader[K, V] = null
230231
val inputFormat = getInputFormat(jobConf)
@@ -237,40 +238,26 @@ class HadoopRDD[K, V](
237238
val key: K = reader.createKey()
238239
val value: V = reader.createValue()
239240

240-
var recordsSinceMetricsUpdate = 0
241-
242241
override def getNext() = {
243242
try {
244243
finished = !reader.next(key, value)
245244
} catch {
246245
case eof: EOFException =>
247246
finished = true
248247
}
249-
250-
// Update bytes read metric every few records
251-
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
252-
&& bytesReadCallback.isDefined) {
253-
recordsSinceMetricsUpdate = 0
254-
val bytesReadFn = bytesReadCallback.get
255-
inputMetrics.bytesRead = bytesReadFn()
256-
} else {
257-
recordsSinceMetricsUpdate += 1
258-
}
259248
(key, value)
260249
}
261250

262251
override def close() {
263252
try {
264253
reader.close()
265254
if (bytesReadCallback.isDefined) {
266-
val bytesReadFn = bytesReadCallback.get
267-
inputMetrics.bytesRead = bytesReadFn()
255+
inputMetrics.updateBytesRead()
268256
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
269257
// If we can't get the bytes read from the FS stats, fall back to the split size,
270258
// which may be inaccurate.
271259
try {
272-
inputMetrics.bytesRead = split.inputSplit.value.getLength
273-
context.taskMetrics.inputMetrics = Some(inputMetrics)
260+
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
274261
} catch {
275262
case e: java.io.IOException =>
276263
logWarning("Unable to get input size to set InputMetrics for task", e)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
109109
logInfo("Input split: " + split.serializableHadoopSplit)
110110
val conf = confBroadcast.value.value
111111

112-
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
112+
val inputMetrics = context.taskMetrics
113+
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
114+
113115
// Find a function that will return the FileSystem bytes read by this thread. Do this before
114116
// creating RecordReader, because RecordReader's constructor might read some bytes
115-
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
116-
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
117-
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
118-
} else {
119-
None
120-
}
121-
if (bytesReadCallback.isDefined) {
122-
context.taskMetrics.inputMetrics = Some(inputMetrics)
123-
}
117+
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
118+
split.serializableHadoopSplit.value match {
119+
case split: FileSplit =>
120+
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
121+
case _ => None
122+
}
123+
)
124+
inputMetrics.setBytesReadCallback(bytesReadCallback)
124125

125126
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
126127
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
@@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
153154
throw new java.util.NoSuchElementException("End of stream")
154155
}
155156
havePair = false
156-
157-
// Update bytes read metric every few records
158-
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
159-
&& bytesReadCallback.isDefined) {
160-
recordsSinceMetricsUpdate = 0
161-
val bytesReadFn = bytesReadCallback.get
162-
inputMetrics.bytesRead = bytesReadFn()
163-
} else {
164-
recordsSinceMetricsUpdate += 1
165-
}
166-
167157
(reader.getCurrentKey, reader.getCurrentValue)
168158
}
169159

170160
private def close() {
171161
try {
172162
reader.close()
173-
174-
// Update metrics with final amount
175163
if (bytesReadCallback.isDefined) {
176-
val bytesReadFn = bytesReadCallback.get
177-
inputMetrics.bytesRead = bytesReadFn()
164+
inputMetrics.updateBytesRead()
178165
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
179166
// If we can't get the bytes read from the FS stats, fall back to the split size,
180167
// which may be inaccurate.
181168
try {
182-
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
183-
context.taskMetrics.inputMetrics = Some(inputMetrics)
169+
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
184170
} catch {
185171
case e: java.io.IOException =>
186172
logWarning("Unable to get input size to set InputMetrics for task", e)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[spark] class BlockResult(
5353
readMethod: DataReadMethod.Value,
5454
bytes: Long) {
5555
val inputMetrics = new InputMetrics(readMethod)
56-
inputMetrics.bytesRead = bytes
56+
inputMetrics.addBytesRead(bytes)
5757
}
5858

5959
/**

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,8 @@ private[spark] object JsonProtocol {
637637
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
638638
metrics.shuffleWriteMetrics =
639639
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
640-
metrics.inputMetrics =
641-
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
640+
metrics.setInputMetrics(
641+
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
642642
metrics.outputMetrics =
643643
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
644644
metrics.updatedBlocks =
@@ -671,7 +671,7 @@ private[spark] object JsonProtocol {
671671
def inputMetricsFromJson(json: JValue): InputMetrics = {
672672
val metrics = new InputMetrics(
673673
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
674-
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
674+
metrics.addBytesRead((json \ "Bytes Read").extract[Long])
675675
metrics
676676
}
677677

0 commit comments

Comments
 (0)