Skip to content

Commit 48197b7

Browse files
authored
ESQL: Pushdown Lookup Join past Project (#129503) (#129842)
Add a new logical plan optimization: When there is a Project (KEEP/DROP/RENAME/renaming EVALs) in a LOOKUP JOIN's left child (the "main" side), perform the Project after the LOOKUP JOIN. This prevents premature field extractions when the lookup join happens on data nodes. (cherry picked from commit 809dab1) # Conflicts: # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java
1 parent 2359e4c commit 48197b7

File tree

16 files changed

+901
-370
lines changed

16 files changed

+901
-370
lines changed

docs/changelog/129503.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 129503
2+
summary: Pushdown Lookup Join past Project
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 119082

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,66 @@ emp_no:integer | language_code:integer | language_name:keyword
144144
10030 |0 | null
145145
;
146146

147+
multipleLookupsAndProjects
148+
required_capability: join_lookup_v12
149+
150+
FROM employees
151+
| KEEP languages, emp_no
152+
| EVAL language_code = languages
153+
| LOOKUP JOIN languages_lookup ON language_code
154+
| RENAME language_name AS foo
155+
| LOOKUP JOIN languages_lookup ON language_code
156+
| DROP foo
157+
| SORT emp_no
158+
| LIMIT 5
159+
;
160+
161+
languages:integer | emp_no:integer | language_code:integer | language_name:keyword
162+
2 | 10001 | 2 | French
163+
5 | 10002 | 5 | null
164+
4 | 10003 | 4 | German
165+
5 | 10004 | 5 | null
166+
1 | 10005 | 1 | English
167+
;
168+
169+
multipleLookupsAndProjectsNonUnique
170+
required_capability: join_lookup_v12
171+
172+
FROM employees
173+
| KEEP languages, emp_no
174+
| EVAL language_code = languages
175+
# this lookup contributes 0 fields in the end, but it contributes rows via multiple matches
176+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
177+
| RENAME language_name AS foo
178+
| LOOKUP JOIN languages_lookup ON language_code
179+
| DROP foo, country, country.keyword
180+
| SORT emp_no
181+
| LIMIT 5
182+
;
183+
184+
languages:integer | emp_no:integer | language_code:integer | language_name:keyword
185+
2 | 10001 | 2 | French
186+
2 | 10001 | 2 | French
187+
2 | 10001 | 2 | French
188+
5 | 10002 | 5 | null
189+
4 | 10003 | 4 | German
190+
;
191+
192+
projectsWithShadowingAfterPushdown
193+
required_capability: join_lookup_v12
194+
195+
FROM languages_lookup
196+
| WHERE language_code == 4
197+
# not shadowed - unless the rename would happen after the lookup
198+
| RENAME language_name AS original_language_name
199+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
200+
| DROP country*
201+
;
202+
203+
language_code:integer | original_language_name:keyword | language_name:keyword
204+
4 | German | Quenya
205+
;
206+
147207
###########################################################################
148208
# multiple match behavior with languages_lookup_non_unique_key index
149209
###########################################################################

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEnrich;
3939
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEval;
4040
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownInferencePlan;
41+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownJoinPastProject;
4142
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownRegexExtract;
4243
import org.elasticsearch.xpack.esql.optimizer.rules.logical.RemoveStatsOverride;
4344
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAggregateAggExpressionWithEval;
@@ -189,6 +190,7 @@ protected static Batch<LogicalPlan> operators() {
189190
new PushDownEval(),
190191
new PushDownRegexExtract(),
191192
new PushDownEnrich(),
193+
new PushDownJoinPastProject(),
192194
new PushDownAndCombineOrderBy(),
193195
new PruneRedundantOrderBy(),
194196
new PruneRedundantSortClauses(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.Alias;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
13+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
14+
import org.elasticsearch.xpack.esql.core.expression.Expression;
15+
import org.elasticsearch.xpack.esql.core.expression.Expressions;
16+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
17+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
18+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
19+
import org.elasticsearch.xpack.esql.plan.logical.Project;
20+
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
21+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
22+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
23+
24+
import java.util.ArrayList;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Set;
28+
29+
/**
30+
* If a {@link Project} is found in the left child of a left {@link Join}, perform it after. Due to requiring the projected attributes
31+
* later, field extractions can also happen later, making joins cheaper to execute on data nodes.
32+
* E.g. {@code ... | RENAME field AS otherfield | LOOKUP JOIN lu_idx ON key}
33+
* becomes {@code ... | LOOKUP JOIN lu_idx ON key | RENAME field AS otherfield }.
34+
* When a {@code LOOKUP JOIN}'s lookup fields shadow the previous fields, we need to leave an {@link Eval} in place of the {@link Project}
35+
* to assign a temporary name. Assume that {@code field} is a lookup-added field, then
36+
* {@code ... | RENAME field AS otherfield | LOOKUP JOIN lu_idx ON key}
37+
* becomes something like
38+
* {@code ... | EVAL $$field = field | LOOKUP JOIN lu_idx ON key | RENAME $$field AS otherfield}.
39+
* Leaving {@code EVAL $$field = field} in place of the original projection, rather than a Project, avoids infinite loops.
40+
*/
41+
public final class PushDownJoinPastProject extends OptimizerRules.OptimizerRule<Join> {
42+
@Override
43+
protected LogicalPlan rule(Join join) {
44+
if (join instanceof InlineJoin) {
45+
// Do not apply to INLINESTATS; this rule could be expanded to include INLINESTATS, but the StubRelation refers to the left
46+
// child - so pulling out a Project from the left child would require us to also update the StubRelation (and the Aggregate
47+
// on top of it)
48+
// TODO: figure out how to push down in case of INLINESTATS
49+
return join;
50+
}
51+
52+
if (join.left() instanceof Project project && join.config().type() == JoinTypes.LEFT) {
53+
AttributeMap.Builder<Expression> aliasBuilder = AttributeMap.builder();
54+
project.forEachExpression(Alias.class, a -> aliasBuilder.put(a.toAttribute(), a.child()));
55+
var aliasesFromProject = aliasBuilder.build();
56+
57+
// Propagate any renames into the Join, as we will remove the upstream Project.
58+
// E.g. `RENAME field AS key | LOOKUP JOIN idx ON key` -> `LOOKUP JOIN idx ON field | ...`
59+
Join updatedJoin = PushDownUtils.resolveRenamesFromMap(join, aliasesFromProject);
60+
61+
// Construct the expressions for the new downstream Project using the Join's output.
62+
// We need to carry over RENAMEs/aliases from the original upstream Project.
63+
List<Attribute> originalOutput = join.output();
64+
List<NamedExpression> newProjections = new ArrayList<>(originalOutput.size());
65+
for (Attribute attr : originalOutput) {
66+
Attribute resolved = (Attribute) aliasesFromProject.resolve(attr, attr);
67+
if (attr.semanticEquals(resolved)) {
68+
newProjections.add(attr);
69+
} else {
70+
Alias renamed = new Alias(attr.source(), attr.name(), resolved, attr.id(), attr.synthetic());
71+
newProjections.add(renamed);
72+
}
73+
}
74+
75+
// This doesn't deal with name conflicts yet. Any name shadowed by a lookup field from the `LOOKUP JOIN` could still have been
76+
// used in the original Project; any such conflict needs to be resolved by copying the attribute under a temporary name via an
77+
// Eval - and using the attribute from said Eval in the new downstream Project.
78+
Set<String> lookupFieldNames = new HashSet<>(Expressions.names(join.rightOutputFields()));
79+
List<NamedExpression> finalProjections = new ArrayList<>(newProjections.size());
80+
AttributeMap.Builder<Alias> aliasesForReplacedAttributesBuilder = AttributeMap.builder();
81+
AttributeSet leftOutput = project.child().outputSet();
82+
83+
for (NamedExpression newProj : newProjections) {
84+
Attribute coreAttr = (Attribute) Alias.unwrap(newProj);
85+
// Only fields from the left need to be protected from conflicts - because fields from the right shadow them.
86+
if (leftOutput.contains(coreAttr) && lookupFieldNames.contains(coreAttr.name())) {
87+
// Conflict - the core attribute will be shadowed by the `LOOKUP JOIN` and we need to alias it in an upstream Eval.
88+
Alias renaming = aliasesForReplacedAttributesBuilder.computeIfAbsent(coreAttr, a -> {
89+
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name());
90+
return new Alias(a.source(), tempName, a, null, true);
91+
});
92+
93+
Attribute renamedAttribute = renaming.toAttribute();
94+
Alias renamedBack;
95+
if (newProj instanceof Alias as) {
96+
renamedBack = new Alias(as.source(), as.name(), renamedAttribute, as.id(), as.synthetic());
97+
} else {
98+
// no alias - that means proj == coreAttr
99+
renamedBack = new Alias(coreAttr.source(), coreAttr.name(), renamedAttribute, coreAttr.id(), coreAttr.synthetic());
100+
}
101+
finalProjections.add(renamedBack);
102+
} else {
103+
finalProjections.add(newProj);
104+
}
105+
}
106+
107+
if (aliasesForReplacedAttributesBuilder.isEmpty()) {
108+
// No name conflicts, so no eval needed.
109+
return new Project(project.source(), updatedJoin.replaceLeft(project.child()), newProjections);
110+
}
111+
112+
List<Alias> renamesForEval = new ArrayList<>(aliasesForReplacedAttributesBuilder.build().values());
113+
Eval eval = new Eval(project.source(), project.child(), renamesForEval);
114+
Join finalJoin = new Join(join.source(), eval, updatedJoin.right(), updatedJoin.config());
115+
116+
return new Project(project.source(), finalJoin, finalProjections);
117+
}
118+
119+
return join;
120+
}
121+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownUtils.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
29-
import java.util.LinkedHashSet;
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Set;
@@ -55,10 +54,10 @@ class PushDownUtils {
5554
public static <Plan extends UnaryPlan & GeneratingPlan<Plan>> LogicalPlan pushGeneratingPlanPastProjectAndOrderBy(Plan generatingPlan) {
5655
LogicalPlan child = generatingPlan.child();
5756
if (child instanceof OrderBy orderBy) {
58-
Set<String> evalFieldNames = new LinkedHashSet<>(Expressions.names(generatingPlan.generatedAttributes()));
57+
Set<String> generatedFieldNames = new HashSet<>(Expressions.names(generatingPlan.generatedAttributes()));
5958

6059
// Look for attributes in the OrderBy's expressions and create aliases with temporary names for them.
61-
AttributeReplacement nonShadowedOrders = renameAttributesInExpressions(evalFieldNames, orderBy.order());
60+
AttributeReplacement nonShadowedOrders = renameAttributesInExpressions(generatedFieldNames, orderBy.order());
6261

6362
AttributeMap<Alias> aliasesForShadowedOrderByAttrs = nonShadowedOrders.replacedAttributes;
6463
@SuppressWarnings("unchecked")
@@ -91,8 +90,7 @@ public static <Plan extends UnaryPlan & GeneratingPlan<Plan>> LogicalPlan pushGe
9190

9291
List<Attribute> generatedAttributes = generatingPlan.generatedAttributes();
9392

94-
@SuppressWarnings("unchecked")
95-
Plan generatingPlanWithResolvedExpressions = (Plan) resolveRenamesFromProject(generatingPlan, project);
93+
Plan generatingPlanWithResolvedExpressions = resolveRenamesFromProject(generatingPlan, project);
9694

9795
Set<String> namesReferencedInRenames = new HashSet<>();
9896
for (NamedExpression ne : project.projections()) {
@@ -156,7 +154,7 @@ private static AttributeReplacement renameAttributesInExpressions(
156154
rewrittenExpressions.add(expr.transformUp(Attribute.class, attr -> {
157155
if (attributeNamesToRename.contains(attr.name())) {
158156
Alias renamedAttribute = aliasesForReplacedAttributesBuilder.computeIfAbsent(attr, a -> {
159-
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name(), "temp_name");
157+
String tempName = TemporaryNameUtils.locallyUniqueTemporaryName(a.name());
160158
return new Alias(a.source(), tempName, a, null, true);
161159
});
162160
return renamedAttribute.toAttribute();
@@ -181,7 +179,7 @@ private static Map<String, String> newNamesForConflictingAttributes(
181179
for (Attribute attr : potentiallyConflictingAttributes) {
182180
String name = attr.name();
183181
if (reservedNames.contains(name)) {
184-
renameAttributeTo.putIfAbsent(name, TemporaryNameUtils.locallyUniqueTemporaryName(name, "temp_name"));
182+
renameAttributeTo.putIfAbsent(name, TemporaryNameUtils.locallyUniqueTemporaryName(name));
185183
}
186184
}
187185

@@ -198,12 +196,17 @@ public static Project pushDownPastProject(UnaryPlan parent) {
198196
}
199197
}
200198

201-
private static UnaryPlan resolveRenamesFromProject(UnaryPlan plan, Project project) {
199+
private static <P extends LogicalPlan> P resolveRenamesFromProject(P plan, Project project) {
202200
AttributeMap.Builder<Expression> aliasBuilder = AttributeMap.builder();
203201
project.forEachExpression(Alias.class, a -> aliasBuilder.put(a.toAttribute(), a.child()));
204202
var aliases = aliasBuilder.build();
205203

206-
return (UnaryPlan) plan.transformExpressionsOnly(ReferenceAttribute.class, r -> aliases.resolve(r, r));
204+
return resolveRenamesFromMap(plan, aliases);
205+
}
206+
207+
@SuppressWarnings("unchecked")
208+
public static <P extends LogicalPlan> P resolveRenamesFromMap(P plan, AttributeMap<Expression> map) {
209+
return (P) plan.transformExpressionsOnly(ReferenceAttribute.class, r -> map.resolve(r, r));
207210
}
208211

209212
private record AttributeReplacement(List<Expression> rewrittenExpressions, AttributeMap<Alias> replacedAttributes) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TemporaryNameUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public static String temporaryName(Expression inner, Expression outer, int suffi
2222
return Attribute.rawTemporaryName(in, out, String.valueOf(suffix));
2323
}
2424

25-
public static String locallyUniqueTemporaryName(String inner, String outer) {
26-
return Attribute.rawTemporaryName(inner, outer, (new NameId()).toString());
25+
public static String locallyUniqueTemporaryName(String inner) {
26+
return Attribute.rawTemporaryName(inner, "temp_name", (new NameId()).toString());
2727
}
2828

2929
static String toString(Expression ex) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Project.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
13+
import org.elasticsearch.xpack.esql.core.expression.Alias;
1314
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1415
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1516
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
17+
import org.elasticsearch.xpack.esql.core.expression.UnresolvedNamedExpression;
1618
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1719
import org.elasticsearch.xpack.esql.core.tree.Source;
1820
import org.elasticsearch.xpack.esql.expression.function.Functions;
@@ -23,7 +25,7 @@
2325
import java.util.Objects;
2426

2527
/**
26-
* A {@code Project} is a {@code Plan} with one child. In {@code SELECT x FROM y}, the "SELECT" statement is a Project.
28+
* A {@code Project} is a {@code Plan} with one child. In {@code FROM idx | KEEP x, y}, the {@code KEEP} statement is a Project.
2729
*/
2830
public class Project extends UnaryPlan implements SortAgnostic {
2931
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Project", Project::new);
@@ -33,6 +35,20 @@ public class Project extends UnaryPlan implements SortAgnostic {
3335
public Project(Source source, LogicalPlan child, List<? extends NamedExpression> projections) {
3436
super(source, child);
3537
this.projections = projections;
38+
assert validateProjections(projections);
39+
}
40+
41+
private boolean validateProjections(List<? extends NamedExpression> projections) {
42+
for (NamedExpression ne : projections) {
43+
if (ne instanceof Alias as) {
44+
if (as.child() instanceof Attribute == false) {
45+
return false;
46+
}
47+
} else if (ne instanceof Attribute == false && ne instanceof UnresolvedNamedExpression == false) {
48+
return false;
49+
}
50+
}
51+
return true;
3652
}
3753

3854
private Project(StreamInput in) throws IOException {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EsqlProject.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import java.io.IOException;
2222
import java.util.List;
2323

24+
/**
25+
* A projection when first parsed, i.e. obtained from {@code KEEP, DROP, RENAME}. After the analysis step, we use {@link Project}.
26+
*/
27+
// TODO: Consolidate with Project. We don't need the pre-/post-analysis distinction for other logical plans.
28+
// https://github.com/elastic/elasticsearch/issues/109195
2429
public class EsqlProject extends Project {
2530
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
2631
LogicalPlan.class,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public static IndexResolution expandedDefaultIndexResolution() {
159159
}
160160

161161
public static Map<String, IndexResolution> defaultLookupResolution() {
162-
return Map.of("languages_lookup", loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP));
162+
return Map.of(
163+
"languages_lookup",
164+
loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP),
165+
"test_lookup",
166+
loadMapping("mapping-basic.json", "test_lookup", IndexMode.LOOKUP)
167+
);
163168
}
164169

165170
public static EnrichResolution defaultEnrichResolution() {

0 commit comments

Comments
 (0)