Skip to content

Commit 1c752b8

Browse files
Davies Liurxin
Davies Liu
authored andcommitted
[SPARK-10341] [SQL] fix memory starving in unsafe SMJ
In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page. Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children. cc rxin JoshRosen Author: Davies Liu <[email protected]> Closes apache#8511 from davies/smj_memory. (cherry picked from commit 540bdee) Signed-off-by: Reynold Xin <[email protected]>
1 parent 33ce274 commit 1c752b8

File tree

3 files changed

+42
-6
lines changed

3 files changed

+42
-6
lines changed

core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import scala.collection.mutable.ArrayBuffer
2021
import scala.reflect.ClassTag
2122

2223
import org.apache.spark.{Partition, Partitioner, TaskContext}
@@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
3839

3940
override def getPartitions: Array[Partition] = firstParent[T].partitions
4041

42+
// In certain join operations, prepare can be called on the same partition multiple times.
43+
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
44+
private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
45+
46+
/**
47+
* Prepare a partition for a single call to compute.
48+
*/
49+
def prepare(): Unit = {
50+
preparedArguments += preparePartition()
51+
}
52+
4153
/**
4254
* Prepare a partition before computing it from its parent.
4355
*/
4456
override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
45-
val preparedArgument = preparePartition()
57+
val prepared =
58+
if (preparedArguments.isEmpty) {
59+
preparePartition()
60+
} else {
61+
preparedArguments.remove(0)
62+
}
4663
val parentIterator = firstParent[T].iterator(partition, context)
47-
executePartition(context, partition.index, preparedArgument, parentIterator)
64+
executePartition(context, partition.index, prepared, parentIterator)
4865
}
4966
}

core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
7373
super.clearDependencies()
7474
rdds = null
7575
}
76+
77+
/**
78+
* Call the prepare method of every parent that has one.
79+
* This is needed for reserving execution memory in advance.
80+
*/
81+
protected def tryPrepareParents(): Unit = {
82+
rdds.collect {
83+
case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
84+
}
85+
}
7686
}
7787

7888
private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
@@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
8494
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
8595

8696
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
97+
tryPrepareParents()
8798
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
8899
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
89100
}
@@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3
107118
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
108119

109120
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
121+
tryPrepareParents()
110122
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
111123
f(rdd1.iterator(partitions(0), context),
112124
rdd2.iterator(partitions(1), context),
@@ -134,6 +146,7 @@ private[spark] class ZippedPartitionsRDD4
134146
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
135147

136148
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
149+
tryPrepareParents()
137150
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
138151
f(rdd1.iterator(partitions(0), context),
139152
rdd2.iterator(partitions(1), context),

core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,17 @@ class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSpark
4646
}
4747

4848
// Verify that the numbers are pushed in the order expected
49-
val result = {
50-
new MapPartitionsWithPreparationRDD[Int, Int, Unit](
51-
parent, preparePartition, executePartition).collect()
52-
}
49+
val rdd = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
50+
parent, preparePartition, executePartition)
51+
val result = rdd.collect()
5352
assert(result === Array(10, 20, 30))
53+
54+
TestObject.things.clear()
55+
// Zip two of these RDDs, both should be prepared before the parent is executed
56+
val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
57+
parent, preparePartition, executePartition)
58+
val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect()
59+
assert(result2 === Array(10, 10, 20, 30, 20, 30))
5460
}
5561

5662
}

0 commit comments

Comments
 (0)