Skip to content

Commit 6a4bfcd

Browse files
viiryamarmbrus
authored andcommitted
[SPARK-13658][SQL] BooleanSimplification rule is slow with large boolean expressions
JIRA: https://issues.apache.org/jira/browse/SPARK-13658 ## What changes were proposed in this pull request? Quoted from JIRA description: When run TPCDS Q3 [1] with lots predicates to filter out the partitions, the optimizer rule BooleanSimplification take about 2 seconds (it use lots of sematicsEqual, which require copy the whole tree). It will great if we could speedup it. [1] https://github.com/cloudera/impala-tpcds-kit/blob/master/queries/q3.sql How to speed up it: When we ask the canonicalized expression in `Expression`, it calls `Canonicalize.execute` on itself. `Canonicalize.execute` basically transforms up all expressions included in this expression. However, we don't keep the canonicalized versions for these children expressions. So in next time we ask the canonicalized expressions for the children expressions (e.g., `BooleanSimplification`), we will rerun `Canonicalize.execute` on each of them. It wastes much time. By forcing the children expressions to get and keep their canonicalized versions first, we can avoid re-canonicalize these expressions. I simply benchmark it with an expression which is part of the where clause in TPCDS Q3: val testRelation = LocalRelation('ss_sold_date_sk.int, 'd_moy.int, 'i_manufact_id.int, 'ss_item_sk.string, 'i_item_sk.string, 'd_date_sk.int) val input = ('d_date_sk === 'ss_sold_date_sk) && ('ss_item_sk === 'i_item_sk) && ('i_manufact_id === 436) && ('d_moy === 12) && (('ss_sold_date_sk > 2415355 && 'ss_sold_date_sk < 2415385) || ('ss_sold_date_sk > 2415720 && 'ss_sold_date_sk < 2415750) || ('ss_sold_date_sk > 2416085 && 'ss_sold_date_sk < 2416115) || ('ss_sold_date_sk > 2416450 && 'ss_sold_date_sk < 2416480) || ('ss_sold_date_sk > 2416816 && 'ss_sold_date_sk < 2416846) || ('ss_sold_date_sk > 2417181 && 'ss_sold_date_sk < 2417211) || ('ss_sold_date_sk > 2417546 && 'ss_sold_date_sk < 2417576) || ('ss_sold_date_sk > 2417911 && 'ss_sold_date_sk < 2417941) || ('ss_sold_date_sk > 2418277 && 'ss_sold_date_sk < 2418307) || ('ss_sold_date_sk > 2418642 && 'ss_sold_date_sk < 2418672) || ('ss_sold_date_sk > 2419007 && 'ss_sold_date_sk < 2419037) || ('ss_sold_date_sk > 2419372 && 'ss_sold_date_sk < 2419402) || ('ss_sold_date_sk > 2419738 && 'ss_sold_date_sk < 2419768) || ('ss_sold_date_sk > 2420103 && 'ss_sold_date_sk < 2420133) || ('ss_sold_date_sk > 2420468 && 'ss_sold_date_sk < 2420498) || ('ss_sold_date_sk > 2420833 && 'ss_sold_date_sk < 2420863) || ('ss_sold_date_sk > 2421199 && 'ss_sold_date_sk < 2421229) || ('ss_sold_date_sk > 2421564 && 'ss_sold_date_sk < 2421594) || ('ss_sold_date_sk > 2421929 && 'ss_sold_date_sk < 2421959) || ('ss_sold_date_sk > 2422294 && 'ss_sold_date_sk < 2422324) || ('ss_sold_date_sk > 2422660 && 'ss_sold_date_sk < 2422690) || ('ss_sold_date_sk > 2423025 && 'ss_sold_date_sk < 2423055) || ('ss_sold_date_sk > 2423390 && 'ss_sold_date_sk < 2423420) || ('ss_sold_date_sk > 2423755 && 'ss_sold_date_sk < 2423785) || ('ss_sold_date_sk > 2424121 && 'ss_sold_date_sk < 2424151) || ('ss_sold_date_sk > 2424486 && 'ss_sold_date_sk < 2424516) || ('ss_sold_date_sk > 2424851 && 'ss_sold_date_sk < 2424881) || ('ss_sold_date_sk > 2425216 && 'ss_sold_date_sk < 2425246) || ('ss_sold_date_sk > 2425582 && 'ss_sold_date_sk < 2425612) || ('ss_sold_date_sk > 2425947 && 'ss_sold_date_sk < 2425977) || ('ss_sold_date_sk > 2426312 && 'ss_sold_date_sk < 2426342) || ('ss_sold_date_sk > 2426677 && 'ss_sold_date_sk < 2426707) || ('ss_sold_date_sk > 2427043 && 'ss_sold_date_sk < 2427073) || ('ss_sold_date_sk > 2427408 && 'ss_sold_date_sk < 2427438) || ('ss_sold_date_sk > 2427773 && 'ss_sold_date_sk < 2427803) || ('ss_sold_date_sk > 2428138 && 'ss_sold_date_sk < 2428168) || ('ss_sold_date_sk > 2428504 && 'ss_sold_date_sk < 2428534) || ('ss_sold_date_sk > 2428869 && 'ss_sold_date_sk < 2428899) || ('ss_sold_date_sk > 2429234 && 'ss_sold_date_sk < 2429264) || ('ss_sold_date_sk > 2429599 && 'ss_sold_date_sk < 2429629) || ('ss_sold_date_sk > 2429965 && 'ss_sold_date_sk < 2429995) || ('ss_sold_date_sk > 2430330 && 'ss_sold_date_sk < 2430360) || ('ss_sold_date_sk > 2430695 && 'ss_sold_date_sk < 2430725) || ('ss_sold_date_sk > 2431060 && 'ss_sold_date_sk < 2431090) || ('ss_sold_date_sk > 2431426 && 'ss_sold_date_sk < 2431456) || ('ss_sold_date_sk > 2431791 && 'ss_sold_date_sk < 2431821) || ('ss_sold_date_sk > 2432156 && 'ss_sold_date_sk < 2432186) || ('ss_sold_date_sk > 2432521 && 'ss_sold_date_sk < 2432551) || ('ss_sold_date_sk > 2432887 && 'ss_sold_date_sk < 2432917) || ('ss_sold_date_sk > 2433252 && 'ss_sold_date_sk < 2433282) || ('ss_sold_date_sk > 2433617 && 'ss_sold_date_sk < 2433647) || ('ss_sold_date_sk > 2433982 && 'ss_sold_date_sk < 2434012) || ('ss_sold_date_sk > 2434348 && 'ss_sold_date_sk < 2434378) || ('ss_sold_date_sk > 2434713 && 'ss_sold_date_sk < 2434743))) val plan = testRelation.where(input).analyze val actual = Optimize.execute(plan) With this patch: 352 milliseconds 346 milliseconds 340 milliseconds Without this patch: 585 milliseconds 880 milliseconds 677 milliseconds ## How was this patch tested? Existing tests should pass. Author: Liang-Chi Hsieh <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes apache#11647 from viirya/improve-expr-canonicalize.
1 parent 63f642a commit 6a4bfcd

File tree

2 files changed

+27
-29
lines changed

2 files changed

+27
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import org.apache.spark.sql.catalyst.rules._
21-
2220
/**
2321
* Rewrites an expression using rules that are guaranteed preserve the result while attempting
2422
* to remove cosmetic variations. Deterministic expressions that are `equal` after canonicalization
@@ -30,52 +28,49 @@ import org.apache.spark.sql.catalyst.rules._
3028
* - Names and nullability hints for [[org.apache.spark.sql.types.DataType]]s are stripped.
3129
* - Commutative and associative operations ([[Add]] and [[Multiply]]) have their children ordered
3230
* by `hashCode`.
33-
* - [[EqualTo]] and [[EqualNullSafe]] are reordered by `hashCode`.
31+
* - [[EqualTo]] and [[EqualNullSafe]] are reordered by `hashCode`.
3432
* - Other comparisons ([[GreaterThan]], [[LessThan]]) are reversed by `hashCode`.
3533
*/
36-
object Canonicalize extends RuleExecutor[Expression] {
37-
override protected def batches: Seq[Batch] =
38-
Batch(
39-
"Expression Canonicalization", FixedPoint(100),
40-
IgnoreNamesTypes,
41-
Reorder) :: Nil
34+
object Canonicalize extends {
35+
def execute(e: Expression): Expression = {
36+
expressionReorder(ignoreNamesTypes(e))
37+
}
4238

4339
/** Remove names and nullability from types. */
44-
protected object IgnoreNamesTypes extends Rule[Expression] {
45-
override def apply(e: Expression): Expression = e transformUp {
46-
case a: AttributeReference =>
47-
AttributeReference("none", a.dataType.asNullable)(exprId = a.exprId)
48-
}
40+
private def ignoreNamesTypes(e: Expression): Expression = e match {
41+
case a: AttributeReference =>
42+
AttributeReference("none", a.dataType.asNullable)(exprId = a.exprId)
43+
case _ => e
4944
}
5045

5146
/** Collects adjacent commutative operations. */
52-
protected def gatherCommutative(
47+
private def gatherCommutative(
5348
e: Expression,
5449
f: PartialFunction[Expression, Seq[Expression]]): Seq[Expression] = e match {
5550
case c if f.isDefinedAt(c) => f(c).flatMap(gatherCommutative(_, f))
5651
case other => other :: Nil
5752
}
5853

5954
/** Orders a set of commutative operations by their hash code. */
60-
protected def orderCommutative(
55+
private def orderCommutative(
6156
e: Expression,
6257
f: PartialFunction[Expression, Seq[Expression]]): Seq[Expression] =
6358
gatherCommutative(e, f).sortBy(_.hashCode())
6459

6560
/** Rearrange expressions that are commutative or associative. */
66-
protected object Reorder extends Rule[Expression] {
67-
override def apply(e: Expression): Expression = e transformUp {
68-
case a: Add => orderCommutative(a, { case Add(l, r) => Seq(l, r) }).reduce(Add)
69-
case m: Multiply => orderCommutative(m, { case Multiply(l, r) => Seq(l, r) }).reduce(Multiply)
61+
private def expressionReorder(e: Expression): Expression = e match {
62+
case a: Add => orderCommutative(a, { case Add(l, r) => Seq(l, r) }).reduce(Add)
63+
case m: Multiply => orderCommutative(m, { case Multiply(l, r) => Seq(l, r) }).reduce(Multiply)
64+
65+
case EqualTo(l, r) if l.hashCode() > r.hashCode() => EqualTo(r, l)
66+
case EqualNullSafe(l, r) if l.hashCode() > r.hashCode() => EqualNullSafe(r, l)
7067

71-
case EqualTo(l, r) if l.hashCode() > r.hashCode() => EqualTo(r, l)
72-
case EqualNullSafe(l, r) if l.hashCode() > r.hashCode() => EqualNullSafe(r, l)
68+
case GreaterThan(l, r) if l.hashCode() > r.hashCode() => LessThan(r, l)
69+
case LessThan(l, r) if l.hashCode() > r.hashCode() => GreaterThan(r, l)
7370

74-
case GreaterThan(l, r) if l.hashCode() > r.hashCode() => LessThan(r, l)
75-
case LessThan(l, r) if l.hashCode() > r.hashCode() => GreaterThan(r, l)
71+
case GreaterThanOrEqual(l, r) if l.hashCode() > r.hashCode() => LessThanOrEqual(r, l)
72+
case LessThanOrEqual(l, r) if l.hashCode() > r.hashCode() => GreaterThanOrEqual(r, l)
7673

77-
case GreaterThanOrEqual(l, r) if l.hashCode() > r.hashCode() => LessThanOrEqual(r, l)
78-
case LessThanOrEqual(l, r) if l.hashCode() > r.hashCode() => GreaterThanOrEqual(r, l)
79-
}
74+
case _ => e
8075
}
8176
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ abstract class Expression extends TreeNode[Expression] {
152152
* `deterministic` expressions where `this.canonicalized == other.canonicalized` will always
153153
* evaluate to the same result.
154154
*/
155-
lazy val canonicalized: Expression = Canonicalize.execute(this)
155+
lazy val canonicalized: Expression = {
156+
val canonicalizedChildred = children.map(_.canonicalized)
157+
Canonicalize.execute(withNewChildren(canonicalizedChildred))
158+
}
156159

157160
/**
158161
* Returns true when two expressions will always compute the same result, even if they differ
@@ -161,7 +164,7 @@ abstract class Expression extends TreeNode[Expression] {
161164
* See [[Canonicalize]] for more details.
162165
*/
163166
def semanticEquals(other: Expression): Boolean =
164-
deterministic && other.deterministic && canonicalized == other.canonicalized
167+
deterministic && other.deterministic && canonicalized == other.canonicalized
165168

166169
/**
167170
* Returns a `hashCode` for the calculation performed by this expression. Unlike the standard

0 commit comments

Comments
 (0)