|
18 | 18 | import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
|
19 | 19 | import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
|
20 | 20 | import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
|
| 21 | +import org.elasticsearch.xpack.esql.plan.logical.Eval; |
21 | 22 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
22 | 23 | import org.elasticsearch.xpack.esql.plan.logical.Project;
|
23 | 24 | import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
24 | 25 |
|
25 | 26 | import java.util.ArrayList;
|
| 27 | +import java.util.HashSet; |
26 | 28 | import java.util.LinkedHashSet;
|
27 | 29 | import java.util.List;
|
| 30 | +import java.util.Set; |
28 | 31 |
|
29 | 32 | public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
|
| 33 | + // don't drop groupings from a local plan, as the layout has already been agreed upon |
| 34 | + private final boolean local; |
30 | 35 |
|
31 |
| - public CombineProjections() { |
| 36 | + public CombineProjections(boolean local) { |
32 | 37 | super(OptimizerRules.TransformDirection.UP);
|
| 38 | + this.local = local; |
33 | 39 | }
|
34 | 40 |
|
35 | 41 | @Override
|
@@ -60,29 +66,89 @@ protected LogicalPlan rule(UnaryPlan plan) {
|
60 | 66 | return plan;
|
61 | 67 | }
|
62 | 68 |
|
63 |
| - // Agg with underlying Project (group by on sub-queries) |
64 |
| - if (plan instanceof Aggregate a) { |
65 |
| - if (child instanceof Project p) { |
66 |
| - var groupings = a.groupings(); |
67 |
| - List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size()); |
68 |
| - for (Expression grouping : groupings) { |
69 |
| - if (grouping instanceof Attribute attribute) { |
70 |
| - groupingAttrs.add(attribute); |
71 |
| - } else if (grouping instanceof Alias as && as.child() instanceof Categorize) { |
72 |
| - groupingAttrs.add(as); |
| 69 | + if (plan instanceof Aggregate a && child instanceof Project p) { |
| 70 | + var groupings = a.groupings(); |
| 71 | + |
| 72 | + // sanity checks |
| 73 | + for (Expression grouping : groupings) { |
| 74 | + if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) { |
| 75 | + // After applying ReplaceAggregateNestedExpressionWithEval, |
| 76 | + // evaluatable groupings can only contain attributes. |
| 77 | + throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping); |
| 78 | + } |
| 79 | + } |
| 80 | + assert groupings.size() <= 1 |
| 81 | + || groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false |
| 82 | + : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; |
| 83 | + |
| 84 | + // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
| 85 | + AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder(); |
| 86 | + for (NamedExpression ne : p.projections()) { |
| 87 | + // Record the aliases. |
| 88 | + // Projections are just aliases for attributes, so casting is safe. |
| 89 | + aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
| 90 | + } |
| 91 | + var aliases = aliasesBuilder.build(); |
| 92 | + |
| 93 | + // Propagate any renames from the lower projection into the upper groupings. |
| 94 | + List<Expression> resolvedGroupings = new ArrayList<>(); |
| 95 | + for (Expression grouping : groupings) { |
| 96 | + Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as)); |
| 97 | + resolvedGroupings.add(transformed); |
| 98 | + } |
| 99 | + |
| 100 | + // This can lead to duplicates in the groupings: e.g. |
| 101 | + // | EVAL x = y | STATS ... BY x, y |
| 102 | + if (local) { |
| 103 | + // On the data node, the groupings must be preserved because they affect the physical output (see |
| 104 | + // AbstractPhysicalOperationProviders#intermediateAttributes). |
| 105 | + // In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place |
| 106 | + // of the original projection to create new attributes for the duplicate groups. |
| 107 | + Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size()); |
| 108 | + List<Expression> newGroupings = new ArrayList<>(); |
| 109 | + List<Alias> aliasesAgainstDuplication = new ArrayList<>(); |
| 110 | + |
| 111 | + for (int i = 0; i < groupings.size(); i++) { |
| 112 | + Expression resolvedGrouping = resolvedGroupings.get(i); |
| 113 | + if (seenResolvedGroupings.add(resolvedGrouping)) { |
| 114 | + newGroupings.add(resolvedGrouping); |
73 | 115 | } else {
|
74 |
| - // After applying ReplaceAggregateNestedExpressionWithEval, |
75 |
| - // groupings (except Categorize) can only contain attributes. |
76 |
| - throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping); |
| 116 | + // resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to. |
| 117 | + // should really only be 1 attribute, anyway, but going via .references() includes the case of a |
| 118 | + // GroupingFunction.NonEvaluatableGroupingFunction. |
| 119 | + Attribute coreAttribute = resolvedGrouping.references().iterator().next(); |
| 120 | + |
| 121 | + Alias renameAgainstDuplication = new Alias( |
| 122 | + coreAttribute.source(), |
| 123 | + TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"), |
| 124 | + coreAttribute |
| 125 | + ); |
| 126 | + aliasesAgainstDuplication.add(renameAgainstDuplication); |
| 127 | + |
| 128 | + // propagate the new alias into the new grouping |
| 129 | + AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder(); |
| 130 | + resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute()); |
| 131 | + AttributeMap<Attribute> resolver = resolverBuilder.build(); |
| 132 | + |
| 133 | + newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr))); |
77 | 134 | }
|
78 | 135 | }
|
79 |
| - plan = new Aggregate( |
80 |
| - a.source(), |
81 |
| - p.child(), |
82 |
| - a.aggregateType(), |
83 |
| - combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()), |
84 |
| - combineProjections(a.aggregates(), p.projections()) |
85 |
| - ); |
| 136 | + |
| 137 | + LogicalPlan newChild = aliasesAgainstDuplication.isEmpty() |
| 138 | + ? p.child() |
| 139 | + : new Eval(p.source(), p.child(), aliasesAgainstDuplication); |
| 140 | + plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections())); |
| 141 | + } else { |
| 142 | + // On the coordinator, we can just discard the duplicates. |
| 143 | + // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which |
| 144 | + // will be an alias like `c = CATEGORIZE(attribute)`. |
| 145 | + // Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on |
| 146 | + // regular equality (i.e. based on names) instead of name ids. |
| 147 | + // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. |
| 148 | + // for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also |
| 149 | + // applies in the local case below. |
| 150 | + List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings)); |
| 151 | + plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections())); |
86 | 152 | }
|
87 | 153 | }
|
88 | 154 |
|
@@ -145,38 +211,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
|
145 | 211 | return replaced;
|
146 | 212 | }
|
147 | 213 |
|
148 |
| - private static List<Expression> combineUpperGroupingsAndLowerProjections( |
149 |
| - List<? extends NamedExpression> upperGroupings, |
150 |
| - List<? extends NamedExpression> lowerProjections |
151 |
| - ) { |
152 |
| - assert upperGroupings.size() <= 1 |
153 |
| - || upperGroupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false |
154 |
| - : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; |
155 |
| - // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) |
156 |
| - AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder(); |
157 |
| - for (NamedExpression ne : lowerProjections) { |
158 |
| - // Record the aliases. |
159 |
| - // Projections are just aliases for attributes, so casting is safe. |
160 |
| - aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); |
161 |
| - } |
162 |
| - var aliases = aliasesBuilder.build(); |
163 |
| - |
164 |
| - // Propagate any renames from the lower projection into the upper groupings. |
165 |
| - // This can lead to duplicates: e.g. |
166 |
| - // | EVAL x = y | STATS ... BY x, y |
167 |
| - // All substitutions happen before; groupings must be attributes at this point except for CATEGORIZE which will be an alias like |
168 |
| - // `c = CATEGORIZE(attribute)`. |
169 |
| - // Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet). |
170 |
| - // TODO: The deduplication based on simple equality will be insufficient in case of multiple CATEGORIZEs, e.g. for |
171 |
| - // `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. |
172 |
| - LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>(); |
173 |
| - for (NamedExpression ne : upperGroupings) { |
174 |
| - NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a)); |
175 |
| - resolvedGroupings.add(transformed); |
176 |
| - } |
177 |
| - return new ArrayList<>(resolvedGroupings); |
178 |
| - } |
179 |
| - |
180 | 214 | /**
|
181 | 215 | * Replace grouping alias previously contained in the aggregations that might have been projected away.
|
182 | 216 | */
|
|
0 commit comments