Skip to content

Commit ba8c86d

Browse files
Davies Liudavies
Davies Liu
authored andcommitted
[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources
## What changes were proposed in this pull request? This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them. Also fix the problem for sameResult() on two DataSourceScan. Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad). ## How was this patch tested? Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan). Author: Davies Liu <[email protected]> Closes apache#11514 from davies/existing_rdd.
1 parent 2ef4c59 commit ba8c86d

File tree

12 files changed

+110
-64
lines changed

12 files changed

+110
-64
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ def explain(self, extended=False):
173173
174174
>>> df.explain()
175175
== Physical Plan ==
176-
WholeStageCodegen
177-
: +- Scan ExistingRDD[age#0,name#1]
176+
Scan ExistingRDD[age#0,name#1]
178177
179178
>>> df.explain(True)
180179
== Parsed Logical Plan ==

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
280280
* can do better should override this function.
281281
*/
282282
def sameResult(plan: PlanType): Boolean = {
283-
val canonicalizedLeft = this.canonicalized
284-
val canonicalizedRight = plan.canonicalized
285-
canonicalizedLeft.getClass == canonicalizedRight.getClass &&
286-
canonicalizedLeft.children.size == canonicalizedRight.children.size &&
287-
canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
288-
(canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
283+
val left = this.canonicalized
284+
val right = plan.canonicalized
285+
left.getClass == right.getClass &&
286+
left.children.size == right.children.size &&
287+
left.cleanArgs == right.cleanArgs &&
288+
(left.children, right.children).zipped.forall(_ sameResult _)
289289
}
290290

291291
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,76 @@ private[sql] case class LogicalRDD(
101101
private[sql] case class PhysicalRDD(
102102
output: Seq[Attribute],
103103
rdd: RDD[InternalRow],
104-
override val nodeName: String,
105-
override val metadata: Map[String, String] = Map.empty,
106-
isUnsafeRow: Boolean = false,
107-
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
104+
override val nodeName: String) extends LeafNode {
105+
106+
private[sql] override lazy val metrics = Map(
107+
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
108+
109+
protected override def doExecute(): RDD[InternalRow] = {
110+
val numOutputRows = longMetric("numOutputRows")
111+
rdd.mapPartitionsInternal { iter =>
112+
val proj = UnsafeProjection.create(schema)
113+
iter.map { r =>
114+
numOutputRows += 1
115+
proj(r)
116+
}
117+
}
118+
}
119+
120+
override def simpleString: String = {
121+
s"Scan $nodeName${output.mkString("[", ",", "]")}"
122+
}
123+
}
124+
125+
/** Physical plan node for scanning data from a relation. */
126+
private[sql] case class DataSourceScan(
127+
output: Seq[Attribute],
128+
rdd: RDD[InternalRow],
129+
@transient relation: BaseRelation,
130+
override val metadata: Map[String, String] = Map.empty)
108131
extends LeafNode with CodegenSupport {
109132

133+
override val nodeName: String = relation.toString
134+
135+
// Ignore rdd when checking results
136+
override def sameResult(plan: SparkPlan ): Boolean = plan match {
137+
case other: DataSourceScan => relation == other.relation && metadata == other.metadata
138+
case _ => false
139+
}
140+
110141
private[sql] override lazy val metrics = Map(
111142
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
112143

144+
val outputUnsafeRows = relation match {
145+
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
146+
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
147+
case _: HadoopFsRelation => true
148+
case _ => false
149+
}
150+
151+
override val outputPartitioning = {
152+
val bucketSpec = relation match {
153+
// TODO: this should be closer to bucket planning.
154+
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
155+
case _ => None
156+
}
157+
158+
def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
159+
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
160+
s"(${output.map(_.name).mkString(", ")})")
161+
}
162+
163+
bucketSpec.map { spec =>
164+
val numBuckets = spec.numBuckets
165+
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
166+
HashPartitioning(bucketColumns, numBuckets)
167+
}.getOrElse {
168+
UnknownPartitioning(0)
169+
}
170+
}
171+
113172
protected override def doExecute(): RDD[InternalRow] = {
114-
val unsafeRow = if (isUnsafeRow) {
173+
val unsafeRow = if (outputUnsafeRows) {
115174
rdd
116175
} else {
117176
rdd.mapPartitionsInternal { iter =>
@@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD(
187246
ctx.INPUT_ROW = row
188247
ctx.currentVars = null
189248
val columns2 = exprs.map(_.gen(ctx))
190-
val inputRow = if (isUnsafeRow) row else null
249+
val inputRow = if (outputUnsafeRows) row else null
191250
val scanRows = ctx.freshName("processRows")
192251
ctx.addNewFunction(scanRows,
193252
s"""
@@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD(
221280
}
222281
}
223282

224-
private[sql] object PhysicalRDD {
283+
private[sql] object DataSourceScan {
225284
// Metadata keys
226285
val INPUT_PATHS = "InputPaths"
227286
val PUSHED_FILTERS = "PushedFilters"
228-
229-
def createFromDataSource(
230-
output: Seq[Attribute],
231-
rdd: RDD[InternalRow],
232-
relation: BaseRelation,
233-
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
234-
235-
val outputUnsafeRows = relation match {
236-
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
237-
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
238-
case _: HadoopFsRelation => true
239-
case _ => false
240-
}
241-
242-
val bucketSpec = relation match {
243-
// TODO: this should be closer to bucket planning.
244-
case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
245-
case _ => None
246-
}
247-
248-
def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
249-
throw new AnalysisException(s"bucket column $colName not found in existing columns " +
250-
s"(${output.map(_.name).mkString(", ")})")
251-
}
252-
253-
bucketSpec.map { spec =>
254-
val numBuckets = spec.numBuckets
255-
val bucketColumns = spec.bucketColumnNames.map(toAttribute)
256-
val partitioning = HashPartitioning(bucketColumns, numBuckets)
257-
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
258-
}.getOrElse {
259-
PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
260-
}
261-
}
262287
}

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
4141
case _: BroadcastHashJoin => "bhj"
4242
case _: SortMergeJoin => "smj"
4343
case _: PhysicalRDD => "rdd"
44+
case _: DataSourceScan => "scan"
4445
case _ => nodeName.toLowerCase
4546
}
4647

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
3333
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3434
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3535
import org.apache.spark.sql.catalyst.rules.Rule
36-
import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
36+
import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS}
3737
import org.apache.spark.sql.execution.SparkPlan
3838
import org.apache.spark.sql.execution.command.ExecutedCommand
3939
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils}
@@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
239239
}
240240

241241
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
242-
execution.PhysicalRDD.createFromDataSource(
242+
execution.DataSourceScan(
243243
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
244244

245245
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
639639
// Don't request columns that are only referenced by pushed filters.
640640
.filterNot(handledSet.contains)
641641

642-
val scan = execution.PhysicalRDD.createFromDataSource(
642+
val scan = execution.DataSourceScan(
643643
projects.map(_.toAttribute),
644644
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
645645
relation.relation, metadata)
@@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
649649
val requestedColumns =
650650
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
651651

652-
val scan = execution.PhysicalRDD.createFromDataSource(
652+
val scan = execution.DataSourceScan(
653653
requestedColumns,
654654
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
655655
relation.relation, metadata)

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph {
9393
case "Subquery" if subgraph != null =>
9494
// Subquery should not be included in WholeStageCodegen
9595
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges)
96+
case "ReusedExchange" =>
97+
// Point to the re-used exchange
98+
val node = exchanges(planInfo.children.head)
99+
edges += SparkPlanGraphEdge(node.id, parent.id)
96100
case name =>
97101
val metrics = planInfo.metrics.map { metric =>
98102
SQLPlanMetric(metric.name, metric.accumulatorId,
@@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph {
106110
} else {
107111
subgraph.nodes += node
108112
}
109-
if (name == "ShuffleExchange" || name == "BroadcastExchange") {
113+
if (name.contains("Exchange")) {
110114
exchanges += planInfo -> node
111115
}
112116

sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter
8282
*
8383
* @since 1.3.0
8484
*/
85-
case class In(attribute: String, values: Array[Any]) extends Filter
85+
case class In(attribute: String, values: Array[Any]) extends Filter {
86+
override def hashCode(): Int = {
87+
var h = attribute.hashCode
88+
values.foreach { v =>
89+
h *= 41
90+
h += v.hashCode()
91+
}
92+
h
93+
}
94+
override def equals(o: Any): Boolean = o match {
95+
case In(a, vs) =>
96+
a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2)
97+
case _ => false
98+
}
99+
override def toString: String = {
100+
s"In($attribute, [${values.mkString(",")}]"
101+
}
102+
}
86103

87104
/**
88105
* A filter that evaluates to `true` iff the attribute evaluates to null.

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.sql.{DataFrame, Row}
29-
import org.apache.spark.sql.execution.PhysicalRDD
29+
import org.apache.spark.sql.execution.DataSourceScan
3030
import org.apache.spark.sql.execution.command.ExplainCommand
3131
import org.apache.spark.sql.execution.datasources.LogicalRelation
3232
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
210210
// the plan only has PhysicalRDD to scan JDBCRelation.
211211
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
212212
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
213-
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
214-
assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
213+
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
214+
assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
215215
df
216216
}
217217
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)

sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
312312
try {
313313
val queryExecution = sql(sqlString).queryExecution
314314
val rawPlan = queryExecution.executedPlan.collect {
315-
case p: execution.PhysicalRDD => p
315+
case p: execution.DataSourceScan => p
316316
} match {
317317
case Seq(p) => p
318318
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")

sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
124124
try {
125125
val queryExecution = sql(sqlString).queryExecution
126126
val rawPlan = queryExecution.executedPlan.collect {
127-
case p: execution.PhysicalRDD => p
127+
case p: execution.DataSourceScan => p
128128
} match {
129129
case Seq(p) => p
130130
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
2020
import java.io.File
2121

2222
import org.apache.spark.sql._
23-
import org.apache.spark.sql.execution.PhysicalRDD
23+
import org.apache.spark.sql.execution.DataSourceScan
2424
import org.apache.spark.sql.execution.command.ExecutedCommand
2525
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
2626
import org.apache.spark.sql.hive.execution.HiveTableScan
@@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
196196
}.isEmpty)
197197
assert(
198198
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
199-
case _: PhysicalRDD => true
199+
case _: DataSourceScan => true
200200
}.nonEmpty)
201201
}
202202

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.apache.spark.sql._
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
25-
import org.apache.spark.sql.execution.PhysicalRDD
25+
import org.apache.spark.sql.execution.DataSourceScan
2626
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
2727
import org.apache.spark.sql.execution.exchange.ShuffleExchange
2828
import org.apache.spark.sql.execution.joins.SortMergeJoin
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
9393

9494
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
9595
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
96-
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
96+
val rdd = plan.find(_.isInstanceOf[DataSourceScan])
9797
assert(rdd.isDefined, plan)
9898

9999
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>

0 commit comments

Comments
 (0)