-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-42746][SQL][FIXUP] Fix optimizer failure for SortOrder in the LISTAGG function #51117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -317,21 +317,43 @@ private[aggregate] object CollectTopK { | |
case class ListAgg( | ||
child: Expression, | ||
delimiter: Expression = Literal(null), | ||
orderExpressions: Seq[SortOrder] = Nil, | ||
orderChildExpressions: Seq[Expression] = Nil, | ||
orderDirections: Seq[SortDirection] = Nil, | ||
orderNullOrderings: Seq[NullOrdering] = Nil, | ||
orderSameOrderExpressions: Seq[Seq[Expression]] = Nil, | ||
mutableAggBufferOffset: Int = 0, | ||
inputAggBufferOffset: Int = 0) | ||
extends Collect[mutable.ArrayBuffer[Any]] | ||
with SupportsOrderingWithinGroup | ||
with ImplicitCastInputTypes { | ||
|
||
val orderExpressions: Seq[SortOrder] = orderChildExpressions.zipWithIndex.map { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a lazy val? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can be, but not sure if it would bring any special value to this particular expression. No strong opinion, I can update if needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be lazy val, to avoid repeatedly triggerring it during plan transformation when plan nodes are copied frequently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, I'll update to lazy. |
||
case (orderChild, i) => | ||
SortOrder( | ||
child = orderChild, | ||
direction = if (i < orderDirections.length) orderDirections(i) else Ascending, | ||
nullOrdering = if (i < orderNullOrderings.length) orderNullOrderings(i) else NullsLast, | ||
sameOrderExpressions = if (i < orderSameOrderExpressions.length) { | ||
orderSameOrderExpressions(i) | ||
} else { | ||
Seq.empty | ||
} | ||
) | ||
} | ||
|
||
override def orderingFilled: Boolean = orderExpressions.nonEmpty | ||
|
||
override def isOrderingMandatory: Boolean = false | ||
|
||
override def isDistinctSupported: Boolean = true | ||
|
||
override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction = | ||
copy(orderExpressions = orderingWithinGroup) | ||
copy( | ||
orderChildExpressions = orderingWithinGroup.map(_.child), | ||
orderDirections = orderingWithinGroup.map(_.direction), | ||
orderNullOrderings = orderingWithinGroup.map(_.nullOrdering), | ||
orderSameOrderExpressions = orderingWithinGroup.map(_.sameOrderExpressions) | ||
) | ||
|
||
override protected lazy val bufferElementType: DataType = { | ||
if (!needSaveOrderValue) { | ||
|
@@ -347,10 +369,10 @@ case class ListAgg( | |
lazy val needSaveOrderValue: Boolean = !isOrderCompatible(orderExpressions) | ||
|
||
def this(child: Expression) = | ||
this(child, Literal(null), Nil, 0, 0) | ||
this(child, Literal(null), Nil, Nil, Nil, Nil, 0, 0) | ||
|
||
def this(child: Expression, delimiter: Expression) = | ||
this(child, delimiter, Nil, 0, 0) | ||
this(child, delimiter, Nil, Nil, Nil, Nil, 0, 0) | ||
|
||
override def nullable: Boolean = true | ||
|
||
|
@@ -534,14 +556,18 @@ case class ListAgg( | |
false | ||
} | ||
|
||
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = | ||
override protected def withNewChildrenInternal( | ||
newChildren: IndexedSeq[Expression]): Expression = { | ||
val sortOrderExpressions: Seq[SortOrder] = newChildren.drop(2).map(_.asInstanceOf[SortOrder]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, because |
||
copy( | ||
child = newChildren.head, | ||
delimiter = newChildren(1), | ||
orderExpressions = newChildren | ||
.drop(2) | ||
.map(_.asInstanceOf[SortOrder]) | ||
newChildren.head, | ||
newChildren(1), | ||
orderChildExpressions = sortOrderExpressions.map(_.child), | ||
orderDirections = sortOrderExpressions.map(_.direction), | ||
orderNullOrderings = sortOrderExpressions.map(_.nullOrdering), | ||
orderSameOrderExpressions = sortOrderExpressions.map(_.sameOrderExpressions) | ||
) | ||
} | ||
|
||
private[this] def orderValuesField: Seq[StructField] = { | ||
orderExpressions.zipWithIndex.map { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,12 +258,18 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { | |
def patchAggregateFunctionChildren( | ||
af: AggregateFunction)( | ||
attrs: Expression => Option[Expression]): AggregateFunction = { | ||
val newChildren = af.children.map(c => attrs(c).getOrElse(c)) | ||
val newChildren = af.children.map { | ||
case so: SortOrder => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So is SortOrder the only expression that should propagate into child's child? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the only one that I'm aware of, with respect to aggregate patching. Note that the issue originally came up with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I'm confused. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's because of this:
Otherwise, it would be more difficult to pack up all of the optional / dynamic order expressions (that themselves include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we generate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed. However, I don't think that's viable with this particular expression (please see below for more details).
I don't think that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you point me to the code where we collect the full sort order information from |
||
so.copy(child = attrs(so.child).getOrElse(so.child)) | ||
case c => | ||
attrs(c).getOrElse(c) | ||
} | ||
af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] | ||
} | ||
|
||
// Setup unique distinct aggregate children. | ||
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct | ||
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this safe to do? Does any other aggregate expression use this? Could you please confirm with a test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exactly are you referring to? If you mean filtering out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a way of adding a test for this rule, so that if new expression was added with SortOrder, we actually alert with a proper error? Just want to make sure we do not get into the same state in future, where we have bugs because of optimization. |
||
.filter(!_.isInstanceOf[SortOrder]).distinct | ||
val distinctAggChildAttrMap = distinctAggChildren.map { e => | ||
e.canonicalized -> AttributeReference(e.sql, e.dataType, nullable = true)() | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? There is no
SortOrder
anymore, and we don't need it to make the planner smart to remove duplicated sorts.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SortOrder
is used because ofSupportsOrderingWithinGroup
. Note thatwithOrderingWithinGroup
uses all of the relevant children forListAgg
in s copy call, and then later we rely onorderExpressions
- which is no longer a child (important becauseSortOrder
is unevaluable). So to make this approach, I think we need this. We could of course explore alternative approaches if you have any suggestions?