Skip to content

[8.x] ESQL: Split grouping functions based on their EVAL-ability (#126597) #126715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/reference/esql/functions/description/categorize.asciidoc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,21 @@ c:long
3
;

reuse categorize arg expression in agg
required_capability: categorize_v5

FROM sample_data
| STATS m = MAX(LENGTH(CONCAT(message, "_end"))) BY c = CATEGORIZE(CONCAT(message, "_end"))
| SORT m
;

m:integer |c:keyword
16 |.*?Disconnected_end.*?
20 |.*?Connection.+?error_end.*?
25 |.*?Connected.+?to.*?
;


categorize in aggs inside function
required_capability: categorize_v5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
* from a number of desired buckets (as a hint) and a range (auto mode).
* In the former case, two parameters will be provided, in the latter four.
*/
public class Bucket extends GroupingFunction implements PostOptimizationVerificationAware, TwoOptionalArguments {
public class Bucket extends GroupingFunction.EvaluatableGroupingFunction
implements
PostOptimizationVerificationAware,
TwoOptionalArguments {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new);

// TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
Expand All @@ -36,7 +35,7 @@
* For the implementation, see {@link org.elasticsearch.compute.aggregation.blockhash.CategorizeBlockHash}
* </p>
*/
public class Categorize extends GroupingFunction {
public class Categorize extends GroupingFunction.NonEvaluatableGroupingFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"Categorize",
Expand All @@ -51,9 +50,9 @@ public class Categorize extends GroupingFunction {
detailedDescription = """
`CATEGORIZE` has the following limitations:

* can't be used within other expressions
* can't be used with multiple groupings
* can't be used or referenced within aggregate functions""",
* cant be used within other expressions
* cant be used more than once in the groupings
* cant be used or referenced within aggregate functions and it has to be the first grouping""",
examples = {
@Example(
file = "docs",
Expand Down Expand Up @@ -99,11 +98,6 @@ public Nullability nullable() {
return Nullability.TRUE;
}

@Override
public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
throw new UnsupportedOperationException("CATEGORIZE is only evaluated during aggregations");
}

@Override
protected TypeResolution resolveType() {
return isString(field(), sourceText(), DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@

import static org.elasticsearch.xpack.esql.common.Failure.fail;

public abstract class GroupingFunction extends Function implements EvaluatorMapper, PostAnalysisPlanVerificationAware {
public abstract sealed class GroupingFunction extends Function implements PostAnalysisPlanVerificationAware permits
GroupingFunction.NonEvaluatableGroupingFunction, GroupingFunction.EvaluatableGroupingFunction {

protected GroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}

@Override
public Object fold(FoldContext ctx) {
return EvaluatorMapper.super.fold(source(), ctx);
}

@Override
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
return (p, failures) -> {
Expand All @@ -45,4 +41,29 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
};
}

/**
* This is a class of grouping functions that cannot be evaluated outside the context of an aggregation.
* They will have their evaluation implemented part of an aggregation, which may keep state for their execution, making them "stateful"
* grouping functions.
*/
public abstract static non-sealed class NonEvaluatableGroupingFunction extends GroupingFunction {
protected NonEvaluatableGroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}
}

/**
* This is a class of grouping functions that can be evaluated independently within an EVAL operator, independent of the aggregation
* they're used by.
*/
public abstract static non-sealed class EvaluatableGroupingFunction extends GroupingFunction implements EvaluatorMapper {
protected EvaluatableGroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}

@Override
public Object fold(FoldContext ctx) {
return EvaluatorMapper.super.fold(source(), ctx);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
Expand Down Expand Up @@ -67,11 +67,11 @@ protected LogicalPlan rule(UnaryPlan plan) {
for (Expression grouping : groupings) {
if (grouping instanceof Attribute attribute) {
groupingAttrs.add(attribute);
} else if (grouping instanceof Alias as && as.child() instanceof Categorize) {
} else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) {
groupingAttrs.add(as);
} else {
// After applying ReplaceAggregateNestedExpressionWithEval,
// groupings (except Categorize) can only contain attributes.
// evaluatable groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
}
}
Expand Down Expand Up @@ -147,7 +147,8 @@ private static List<Expression> combineUpperGroupingsAndLowerProjections(
List<? extends NamedExpression> lowerProjections
) {
assert upperGroupings.size() <= 1
|| upperGroupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false
|| upperGroupings.stream()
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
Expand All @@ -161,10 +162,10 @@ private static List<Expression> combineUpperGroupingsAndLowerProjections(
// Propagate any renames from the lower projection into the upper groupings.
// This can lead to duplicates: e.g.
// | EVAL x = y | STATS ... BY x, y
// All substitutions happen before; groupings must be attributes at this point except for CATEGORIZE which will be an alias like
// `c = CATEGORIZE(attribute)`.
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be
// an alias like `c = CATEGORIZE(attribute)`.
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
// TODO: The deduplication based on simple equality will be insufficient in case of multiple CATEGORIZEs, e.g. for
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
for (NamedExpression ne : upperGroupings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;

Expand Down Expand Up @@ -43,9 +43,9 @@ public Expression rule(Expression e, LogicalOptimizerContext ctx) {
return Literal.of(in, null);
}
} else if (e instanceof Alias == false && e.nullable() == Nullability.TRUE
// Categorize function stays as a STATS grouping (It isn't moved to an early EVAL like other groupings),
// Non-evaluatable functions stay as a STATS grouping (It isn't moved to an early EVAL like other groupings),
// so folding it to null would currently break the plan, as we don't create an attribute/channel for that null value.
&& e instanceof Categorize == false
&& e instanceof GroupingFunction.NonEvaluatableGroupingFunction == false
&& Expressions.anyMatch(e.children(), Expressions::isGuaranteedNull)) {
return Literal.of(e, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand Down Expand Up @@ -50,20 +50,20 @@ public ReplaceAggregateAggExpressionWithEval() {

@Override
protected LogicalPlan rule(Aggregate aggregate) {
// build alias map
// an alias map for evaluatable grouping functions
AttributeMap.Builder<Expression> aliasesBuilder = AttributeMap.builder();
aggregate.forEachExpressionUp(Alias.class, a -> aliasesBuilder.put(a.toAttribute(), a.child()));
var aliases = aliasesBuilder.build();

// Build Categorize grouping functions map.
// Functions like BUCKET() shouldn't reach this point,
// as they are moved to an early EVAL by ReplaceAggregateNestedExpressionWithEval
Map<Categorize, Attribute> groupingAttributes = new HashMap<>();
// a function map for non-evaluatable grouping functions
Map<GroupingFunction.NonEvaluatableGroupingFunction, Attribute> nonEvalGroupingAttributes = new HashMap<>(
aggregate.groupings().size()
);
aggregate.forEachExpressionUp(Alias.class, a -> {
if (a.child() instanceof Categorize groupingFunction) {
groupingAttributes.put(groupingFunction, a.toAttribute());
if (a.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction groupingFunction) {
nonEvalGroupingAttributes.put(groupingFunction, a.toAttribute());
} else {
aliasesBuilder.put(a.toAttribute(), a.child());
}
});
var aliases = aliasesBuilder.build();

// break down each aggregate into AggregateFunction and/or grouping key
// preserve the projection at the end
Expand Down Expand Up @@ -123,8 +123,11 @@ protected LogicalPlan rule(Aggregate aggregate) {
return alias.toAttribute();
});

// replace grouping functions with their references
aggExpression = aggExpression.transformUp(Categorize.class, groupingAttributes::get);
// replace non-evaluatable grouping functions with their references
aggExpression = aggExpression.transformUp(
GroupingFunction.NonEvaluatableGroupingFunction.class,
nonEvalGroupingAttributes::get
);

Alias alias = as.replaceChild(aggExpression);
newEvals.add(alias);
Expand Down Expand Up @@ -152,7 +155,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
return plan;
}

static String syntheticName(Expression expression, Expression af, int counter) {
private static String syntheticName(Expression expression, Expression af, int counter) {
return TemporaryNameUtils.temporaryName(expression, af, counter);
}
}
Loading