diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 692d8900e2a7c..4e701744cb749 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -183,3 +183,39 @@ cost:double | cluster:keyword | time_bucket:datetime 22.75 | prod | 2024-05-10T00:13:00.000Z 22.75 | qa | 2024-05-10T00:08:00.000Z ; + +max_of_avg_over_time +required_capability: metrics_command +required_capability: avg_over_time +TS k8s | STATS max_cost=max(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10; + +max_cost:double | cluster:keyword | time_bucket:datetime +12.375 | prod | 2024-05-10T00:17:00.000Z +12.375 | qa | 2024-05-10T00:01:00.000Z +12.25 | prod | 2024-05-10T00:19:00.000Z +12.0625 | qa | 2024-05-10T00:06:00.000Z +11.875 | prod | 2024-05-10T00:15:00.000Z +11.875 | qa | 2024-05-10T00:09:00.000Z +11.625 | prod | 2024-05-10T00:12:00.000Z +11.5 | prod | 2024-05-10T00:05:00.000Z +11.25 | prod | 2024-05-10T00:13:00.000Z +11.0 | qa | 2024-05-10T00:07:00.000Z +; + +avg_of_avg_over_time +required_capability: metrics_command +required_capability: avg_over_time +TS k8s | STATS avg_cost=avg(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT avg_cost DESC, time_bucket DESC, cluster | LIMIT 10; + +avg_cost:double | cluster:keyword | time_bucket:datetime +11.625 | prod | 2024-05-10T00:12:00.000Z +10.6875 | prod | 2024-05-10T00:00:00.000Z +10.145833333333332 | qa | 2024-05-10T00:04:00.000Z +10.0 | staging | 2024-05-10T00:11:00.000Z +9.895833333333334 | qa | 2024-05-10T00:06:00.000Z +9.666666666666666 | prod | 2024-05-10T00:19:00.000Z +8.875 | qa | 2024-05-10T00:01:00.000Z +8.805555555555555 | qa | 2024-05-10T00:09:00.000Z +8.71875 | prod | 2024-05-10T00:22:00.000Z +8.5625 | qa | 2024-05-10T00:22:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index d66181f548807..912f3fe34eba9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -962,7 +962,7 @@ public enum Cap { QUERY_MONITORING, /** - * Support max_over_time aggregation + * Support max_over_time aggregation that gets evaluated per time-series */ MAX_OVER_TIME(Build.current().isSnapshot()), @@ -974,7 +974,12 @@ public enum Cap { /** * Does the usage information for ESQL contain a histogram of {@code took} values? */ - USAGE_CONTAINS_TOOK; + USAGE_CONTAINS_TOOK, + + /** + * Support avg_over_time aggregation that gets evaluated per time-series + */ + AVG_OVER_TIME(Build.current().isSnapshot()); private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index a5558b348b8b8..764f83a67e925 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.Check; import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg; +import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; @@ -434,6 +435,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(Delay.class, Delay::new, "delay"), def(Rate.class, Rate::withUnresolvedTimestamp, "rate"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), + def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), def(Term.class, bi(Term::new), "term") } }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 360ba7d3ca8ad..0923dbc5d6853 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -31,6 +31,7 @@ public static List getNamedWriteables() { Top.ENTRY, Values.ENTRY, MaxOverTime.ENTRY, + AvgOverTime.ENTRY, // internal functions ToPartial.ENTRY, FromPartial.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java new file mode 100644 index 0000000000000..5b7803723f255 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.Param; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.emptyList; + +/** + * Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field. + */ +public class AvgOverTime extends TimeSeriesAggregateFunction { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "AvgOverTime", + AvgOverTime::new + ); + + @FunctionInfo(returnType = "double", description = "The average over time of a numeric field.", type = FunctionType.AGGREGATE) + public AvgOverTime( + Source source, + @Param( + name = "number", + type = { "double", "integer", "long" }, + description = "Expression that outputs values to average." + ) Expression field + ) { + this(source, field, Literal.TRUE); + } + + public AvgOverTime(Source source, Expression field, Expression filter) { + super(source, field, filter, emptyList()); + } + + private AvgOverTime(StreamInput in) throws IOException { + super(in); + } + + @Override + protected TypeResolution resolveType() { + return perTimeSeriesAggregation().resolveType(); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public DataType dataType() { + return perTimeSeriesAggregation().dataType(); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, AvgOverTime::new, field(), filter()); + } + + @Override + public AvgOverTime replaceChildren(List newChildren) { + return new AvgOverTime(source(), newChildren.get(0), newChildren.get(1)); + } + + @Override + public AvgOverTime withFilter(Expression filter) { + return new AvgOverTime(source(), field(), filter); + } + + @Override + public AggregateFunction perTimeSeriesAggregation() { + return new Avg(source(), field(), filter()); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 6f98b1da758c5..a166e81ec688f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -138,9 +138,10 @@ protected static Batch substitutions() { new ReplaceAggregateAggExpressionWithEval(), // lastly replace surrogate functions new SubstituteSurrogates(), - // translate metric aggregates after surrogate substitution and replace nested expressions with eval (again) new TranslateTimeSeriesAggregate(), new PruneUnusedIndexMode(), + // after translating metric aggregates, we need to replace surrogate substitutions and nested expressions again. + new SubstituteSurrogates(), new ReplaceAggregateNestedExpressionWithEval(), // this one needs to be placed before ReplaceAliasingEvalWithProject, so that any potential aliasing eval (eval x = y) // is not replaced with a Project before the eval to be copied on the left hand side of an InlineJoin diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index de8568610e025..72c18f1e0b219 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -124,6 +124,15 @@ * | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute) * | STATS sum(max_memory_usage) BY host_values, time_bucket * + * + * TS k8s | STATS sum(avg_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute) + * + * becomes + * + * TS k8s + * | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute) + * | STATS sum(avg_memory_usage) BY host_values, time_bucket + * * */ public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 47ca4ec31ee5e..05b9eb29667b7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -7100,6 +7100,37 @@ public void testTranslateMaxOverTime() { assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp")); } + public void testTranslateAvgOverTime() { + assumeTrue("requires snapshot builds", Build.current().isSnapshot()); + var query = "TS k8s | STATS sum(avg_over_time(network.bytes_in)) BY bucket(@timestamp, 1h)"; + var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query))); + Limit limit = as(plan, Limit.class); + Aggregate finalAgg = as(limit.child(), Aggregate.class); + assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class))); + assertThat(finalAgg.aggregates(), hasSize(2)); + Eval evalAvg = as(finalAgg.child(), Eval.class); + TimeSeriesAggregate aggsByTsid = as(evalAvg.child(), TimeSeriesAggregate.class); + assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped + assertNotNull(aggsByTsid.timeBucket()); + assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1))); + Eval evalBucket = as(aggsByTsid.child(), Eval.class); + assertThat(evalBucket.fields(), hasSize(1)); + as(evalBucket.child(), EsRelation.class); + + Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class); + assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id())); + assertThat(finalAgg.groupings(), hasSize(1)); + assertThat(Expressions.attribute(finalAgg.groupings().get(0)).id(), equalTo(aggsByTsid.aggregates().get(2).id())); + + Sum sumTs = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Sum.class); + assertThat(Expressions.attribute(sumTs.field()).name(), equalTo("network.bytes_in")); + Count countTs = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Count.class); + assertThat(Expressions.attribute(countTs.field()).name(), equalTo("network.bytes_in")); + assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id())); + Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class); + assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp")); + } + public void testMetricsWithoutRate() { assumeTrue("requires snapshot builds", Build.current().isSnapshot()); List queries = List.of(""" diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 5f1eb4859211e..25bae65958f2f 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -33,7 +33,7 @@ setup: path: /_query parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. - capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, max_over_time] + capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time] reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} @@ -101,7 +101,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 135} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 136} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version":