-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-42746][SQL][FIXUP] Fix optimizer failure for SortOrder in the LISTAGG function #51117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Please review this PR, which is a fixup for ListAgg
- originally introduced in: #48748.
mutableAggBufferOffset: Int = 0, | ||
inputAggBufferOffset: Int = 0) | ||
extends Collect[mutable.ArrayBuffer[Any]] | ||
with SupportsOrderingWithinGroup | ||
with ImplicitCastInputTypes { | ||
|
||
val orderExpressions: Seq[SortOrder] = orderChildExpressions.zipWithIndex.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be, but not sure if it would bring any special value to this particular expression. No strong opinion, I can update if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be lazy val, to avoid repeatedly triggerring it during plan transformation when plan nodes are copied frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll update to lazy.
af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] | ||
} | ||
|
||
// Setup unique distinct aggregate children. | ||
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct | ||
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe to do? Does any other aggregate expression use this? Could you please confirm with a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly are you referring to? If you mean filtering out SortOrder
here, as far as I'm aware - the only catalyst expression that uses it is ListAgg
, and tests are added for this expression (including some tests that combine listagg with other aggregates to verify the correct behaviour).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a way of adding a test for this rule, so that if new expression was added with SortOrder, we actually alert with a proper error? Just want to make sure we do not get into the same state in future, where we have bugs because of optimization.
@@ -258,12 +258,18 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { | |||
def patchAggregateFunctionChildren( | |||
af: AggregateFunction)( | |||
attrs: Expression => Option[Expression]): AggregateFunction = { | |||
val newChildren = af.children.map(c => attrs(c).getOrElse(c)) | |||
val newChildren = af.children.map { | |||
case so: SortOrder => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is SortOrder the only expression that should propagate into child's child?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is the only one that I'm aware of, with respect to aggregate patching. Note that the issue originally came up with ListAgg
- which is a bit of a specific aggregate expression. I don't think that we're aware of any other similar issues at this time, but please feel free to share any additional info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm confused. SortOrder
is no longer child of ListAgg
, how can we hit it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SortOrder
is no longer child ofListAgg
, how can we hit it here?
That's because of this:
override def children: Seq[Expression] = child +: delimiter +: orderExpressions
Otherwise, it would be more difficult to pack up all of the optional / dynamic order expressions (that themselves include Seq[Expression]
and even Seq[Seq[Expression]]
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we generate SortOrder
and then put it in the children
? Can we just put orderChildExpressions
in the children
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally children
should be selected from the constructor parameters, not something generated on the fly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally
children
should be selected from the constructor parameters, not something generated on the fly.
Agreed. However, I don't think that's viable with this particular expression (please see below for more details).
Can we just put
orderChildExpressions
in thechildren
?
I don't think that orderChildExpressions
is enough, we need full sortorder information in order to be able to reconstruct the original aggregate - this can be either in a SortOrder
object (as is currently implemented) or a sort of struct containing all of the relevant parameters (which would essentially be SortOrder
, but with extra steps). Otherwise, packing up all optional / dynamic child expressions expressions in a children: Seq[Expression]
would be very difficult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to the code where we collect the full sort order information from children
? Why can't we match the ListAgg
expression and collect whatever info we need?
orderChildExpressions: Seq[Expression] = Nil, | ||
orderDirections: Seq[SortDirection] = Nil, | ||
orderNullOrderings: Seq[NullOrdering] = Nil, | ||
orderSameOrderExpressions: Seq[Seq[Expression]] = Nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? There is no SortOrder
anymore, and we don't need it to make the planner smart to remove duplicated sorts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SortOrder
is used because of SupportsOrderingWithinGroup
. Note that withOrderingWithinGroup
uses all of the relevant children for ListAgg
in s copy call, and then later we rely on orderExpressions
- which is no longer a child (important because SortOrder
is unevaluable). So to make this approach, I think we need this. We could of course explore alternative approaches if you have any suggestions?
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = | ||
override protected def withNewChildrenInternal( | ||
newChildren: IndexedSeq[Expression]): Expression = { | ||
val sortOrderExpressions: Seq[SortOrder] = newChildren.drop(2).map(_.asInstanceOf[SortOrder]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really call ListAgg#withChildren
this way? passing SortOrder
as the new children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because children
is overriden (please see #51117 (comment)).
What changes were proposed in this pull request?
Update the
RewriteDistinctAggregates
optimizer rule to properly rewrite aggregates with: DISTINCTlistagg
/string_agg
. This fixes failures with multiple distinct list aggregations in the same query. Also, to enable this behaviour, this PR refactors theListAgg
catalyst expression to use separate children in the case class definition, instead of usingSortOrder
for order expressions (note thatSortOrder
is an unevaluable expression).Why are the changes needed?
Queries such as:
are failing with
java.lang.ClassCastException
, because theRewriteDistinctAggregates
replacesSortOrder
s withAttributeReference
s. This PR fixes the issue, andSortOrder
s are no longer processed like this.Does this PR introduce any user-facing change?
Yes, queries with multiple distinct list aggregations now work properly.
How was this patch tested?
Added appropriate end-to-end SQL tests to verify the behaviour.
Was this patch authored or co-authored using generative AI tooling?
No.