Skip to content

[SPARK-42746][SQL][FIXUP] Fix optimizer failure for SortOrder in the LISTAGG function #51117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

uros-db
Copy link
Contributor

@uros-db uros-db commented Jun 8, 2025

What changes were proposed in this pull request?

Update the RewriteDistinctAggregates optimizer rule to properly rewrite aggregates with: DISTINCT listagg / string_agg. This fixes failures with multiple distinct list aggregations in the same query. Also, to enable this behaviour, this PR refactors the ListAgg catalyst expression to use separate children in the case class definition, instead of using SortOrder for order expressions (note that SortOrder is an unevaluable expression).

Why are the changes needed?

Queries such as:

SELECT
  LISTAGG(DISTINCT col1) WITHIN GROUP (ORDER BY col1),
  LISTAGG(DISTINCT col2) WITHIN GROUP (ORDER BY col2)
FROM df;

are failing with java.lang.ClassCastException, because the RewriteDistinctAggregates replaces SortOrders with AttributeReferences. This PR fixes the issue, and SortOrders are no longer processed like this.

Does this PR introduce any user-facing change?

Yes, queries with multiple distinct list aggregations now work properly.

How was this patch tested?

Added appropriate end-to-end SQL tests to verify the behaviour.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jun 8, 2025
@uros-db uros-db changed the title [WIP][SPARK-42746][SQL] Fix optimizer failure for SortOrder in the LISTAGG function [SPARK-42746][SQL][FIXUP] Fix optimizer failure for SortOrder in the LISTAGG function Jun 9, 2025
Copy link
Contributor Author

@uros-db uros-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Please review this PR, which is a fixup for ListAgg - originally introduced in: #48748.

mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0)
extends Collect[mutable.ArrayBuffer[Any]]
with SupportsOrderingWithinGroup
with ImplicitCastInputTypes {

val orderExpressions: Seq[SortOrder] = orderChildExpressions.zipWithIndex.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a lazy val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be, but not sure if it would bring any special value to this particular expression. No strong opinion, I can update if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be lazy val, to avoid repeatedly triggerring it during plan transformation when plan nodes are copied frequently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll update to lazy.

af.withNewChildren(newChildren).asInstanceOf[AggregateFunction]
}

// Setup unique distinct aggregate children.
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to do? Does any other aggregate expression use this? Could you please confirm with a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly are you referring to? If you mean filtering out SortOrder here, as far as I'm aware - the only catalyst expression that uses it is ListAgg, and tests are added for this expression (including some tests that combine listagg with other aggregates to verify the correct behaviour).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way of adding a test for this rule, so that if new expression was added with SortOrder, we actually alert with a proper error? Just want to make sure we do not get into the same state in future, where we have bugs because of optimization.

@@ -258,12 +258,18 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
def patchAggregateFunctionChildren(
af: AggregateFunction)(
attrs: Expression => Option[Expression]): AggregateFunction = {
val newChildren = af.children.map(c => attrs(c).getOrElse(c))
val newChildren = af.children.map {
case so: SortOrder =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is SortOrder the only expression that should propagate into child's child?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the only one that I'm aware of, with respect to aggregate patching. Note that the issue originally came up with ListAgg - which is a bit of a specific aggregate expression. I don't think that we're aware of any other similar issues at this time, but please feel free to share any additional info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm confused. SortOrder is no longer child of ListAgg, how can we hit it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortOrder is no longer child of ListAgg, how can we hit it here?

That's because of this:

override def children: Seq[Expression] = child +: delimiter +: orderExpressions

Otherwise, it would be more difficult to pack up all of the optional / dynamic order expressions (that themselves include Seq[Expression] and even Seq[Seq[Expression]]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we generate SortOrder and then put it in the children? Can we just put orderChildExpressions in the children?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally children should be selected from the constructor parameters, not something generated on the fly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally children should be selected from the constructor parameters, not something generated on the fly.

Agreed. However, I don't think that's viable with this particular expression (please see below for more details).

Can we just put orderChildExpressions in the children?

I don't think that orderChildExpressions is enough, we need full sortorder information in order to be able to reconstruct the original aggregate - this can be either in a SortOrder object (as is currently implemented) or a sort of struct containing all of the relevant parameters (which would essentially be SortOrder, but with extra steps). Otherwise, packing up all optional / dynamic child expressions expressions in a children: Seq[Expression] would be very difficult.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to the code where we collect the full sort order information from children? Why can't we match the ListAgg expression and collect whatever info we need?

orderChildExpressions: Seq[Expression] = Nil,
orderDirections: Seq[SortDirection] = Nil,
orderNullOrderings: Seq[NullOrdering] = Nil,
orderSameOrderExpressions: Seq[Seq[Expression]] = Nil,
Copy link
Contributor

@cloud-fan cloud-fan Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? There is no SortOrder anymore, and we don't need it to make the planner smart to remove duplicated sorts.

Copy link
Contributor Author

@uros-db uros-db Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortOrder is used because of SupportsOrderingWithinGroup. Note that withOrderingWithinGroup uses all of the relevant children for ListAgg in s copy call, and then later we rely on orderExpressions - which is no longer a child (important because SortOrder is unevaluable). So to make this approach, I think we need this. We could of course explore alternative approaches if you have any suggestions?

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): Expression = {
val sortOrderExpressions: Seq[SortOrder] = newChildren.drop(2).map(_.asInstanceOf[SortOrder])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really call ListAgg#withChildren this way? passing SortOrder as the new children?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because children is overriden (please see #51117 (comment)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants