Skip to content

Commit 8982cee

Browse files
committed
[SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
### What changes were proposed in this pull request? This is a followup of apache#39624 . `QueryStageExec.isMeterialized` should only return true if `resultOption` is assigned. It can be a potential bug to have this inconsistency. ### Why are the changes needed? fix potential bug ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#40522 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0fde146 commit 8982cee

File tree

3 files changed

+43
-40
lines changed

3 files changed

+43
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -537,12 +537,13 @@ case class AdaptiveSparkPlanExec(
537537
}
538538

539539
case i: InMemoryTableScanExec =>
540+
// There is no reuse for `InMemoryTableScanExec`, which is different from `Exchange`. If we
541+
// hit it the first time, we should always create a new query stage.
540542
val newStage = newQueryStage(i)
541-
val isMaterialized = newStage.isMaterialized
542543
CreateStageResult(
543544
newPlan = newStage,
544-
allChildStagesMaterialized = isMaterialized,
545-
newStages = if (isMaterialized) Seq.empty else Seq(newStage))
545+
allChildStagesMaterialized = false,
546+
newStages = Seq(newStage))
546547

547548
case q: QueryStageExec =>
548549
CreateStageResult(newPlan = q,
@@ -561,34 +562,30 @@ case class AdaptiveSparkPlanExec(
561562
}
562563

563564
private def newQueryStage(plan: SparkPlan): QueryStageExec = {
564-
val optimizedPlan = plan match {
565-
case e: Exchange =>
566-
e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
567-
case _ => plan
568-
}
569-
val newPlan = applyPhysicalRules(
570-
optimizedPlan,
571-
postStageCreationRules(outputsColumnar = plan.supportsColumnar),
572-
Some((planChangeLogger, "AQE Post Stage Creation")))
573565
val queryStage = plan match {
574-
case s: ShuffleExchangeLike =>
575-
if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
576-
throw SparkException.internalError(
577-
"Custom columnar rules cannot transform shuffle node to something else.")
578-
}
579-
ShuffleQueryStageExec(currentStageId, newPlan, s.canonicalized)
580-
case b: BroadcastExchangeLike =>
581-
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
582-
throw SparkException.internalError(
583-
"Custom columnar rules cannot transform broadcast node to something else.")
566+
case e: Exchange =>
567+
val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
568+
val newPlan = applyPhysicalRules(
569+
optimized,
570+
postStageCreationRules(outputsColumnar = plan.supportsColumnar),
571+
Some((planChangeLogger, "AQE Post Stage Creation")))
572+
if (e.isInstanceOf[ShuffleExchangeLike]) {
573+
if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
574+
throw SparkException.internalError(
575+
"Custom columnar rules cannot transform shuffle node to something else.")
576+
}
577+
ShuffleQueryStageExec(currentStageId, newPlan, e.canonicalized)
578+
} else {
579+
assert(e.isInstanceOf[BroadcastExchangeLike])
580+
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
581+
throw SparkException.internalError(
582+
"Custom columnar rules cannot transform broadcast node to something else.")
583+
}
584+
BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
584585
}
585-
BroadcastQueryStageExec(currentStageId, newPlan, b.canonicalized)
586586
case i: InMemoryTableScanExec =>
587-
if (!newPlan.isInstanceOf[InMemoryTableScanExec]) {
588-
throw SparkException.internalError("Custom columnar rules cannot transform " +
589-
"`InMemoryTableScanExec` node to something else.")
590-
}
591-
TableCacheQueryStageExec(currentStageId, newPlan.asInstanceOf[InMemoryTableScanExec])
587+
// No need to optimize `InMemoryTableScanExec` as it's a leaf node.
588+
TableCacheQueryStageExec(currentStageId, i)
592589
}
593590
currentStageId += 1
594591
setLogicalLinkForNewQueryStage(queryStage, plan)

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ case class InsertAdaptiveSparkPlan(
9696
plan.exists {
9797
case _: Exchange => true
9898
case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
99+
// AQE framework has a different way to update the query plan in the UI: it updates the plan
100+
// at the end of execution, while non-AQE updates the plan before execution. If the cached
101+
// plan is already AQEed, the current plan must be AQEed as well so that the UI can get plan
102+
// update correctly.
99103
case i: InMemoryTableScanExec
100104
if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
101105
case p => p.expressions.exists(_.exists {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ abstract class QueryStageExec extends LeafExecNode {
8686
protected var _resultOption = new AtomicReference[Option[Any]](None)
8787

8888
private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
89-
def isMaterialized: Boolean = resultOption.get().isDefined
89+
final def isMaterialized: Boolean = resultOption.get().isDefined
9090

9191
override def output: Seq[Attribute] = plan.output
9292
override def outputPartitioning: Partitioning = plan.outputPartitioning
@@ -275,20 +275,22 @@ case class TableCacheQueryStageExec(
275275
}
276276

277277
@transient
278-
private lazy val future: FutureAction[Unit] = {
279-
val rdd = inMemoryTableScan.baseCacheRDD()
280-
sparkContext.submitJob(
281-
rdd,
282-
(_: Iterator[CachedBatch]) => (),
283-
(0 until rdd.getNumPartitions).toSeq,
284-
(_: Int, _: Unit) => (),
285-
()
286-
)
278+
private lazy val future: Future[Unit] = {
279+
if (inMemoryTableScan.isMaterialized) {
280+
Future.successful(())
281+
} else {
282+
val rdd = inMemoryTableScan.baseCacheRDD()
283+
sparkContext.submitJob(
284+
rdd,
285+
(_: Iterator[CachedBatch]) => (),
286+
(0 until rdd.getNumPartitions).toSeq,
287+
(_: Int, _: Unit) => (),
288+
()
289+
)
290+
}
287291
}
288292

289293
override protected def doMaterialize(): Future[Any] = future
290294

291-
override def isMaterialized: Boolean = super.isMaterialized || inMemoryTableScan.isMaterialized
292-
293295
override def getRuntimeStatistics: Statistics = inMemoryTableScan.relation.computeStats()
294296
}

0 commit comments

Comments
 (0)