Skip to content

Add avg_over_time #126572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,39 @@ cost:double | cluster:keyword | time_bucket:datetime
22.75 | prod | 2024-05-10T00:13:00.000Z
22.75 | qa | 2024-05-10T00:08:00.000Z
;

max_of_avg_over_time
required_capability: metrics_command
required_capability: avg_over_time
TS k8s | STATS max_cost=max(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10;

max_cost:double | cluster:keyword | time_bucket:datetime
12.375 | prod | 2024-05-10T00:17:00.000Z
12.375 | qa | 2024-05-10T00:01:00.000Z
12.25 | prod | 2024-05-10T00:19:00.000Z
12.0625 | qa | 2024-05-10T00:06:00.000Z
11.875 | prod | 2024-05-10T00:15:00.000Z
11.875 | qa | 2024-05-10T00:09:00.000Z
11.625 | prod | 2024-05-10T00:12:00.000Z
11.5 | prod | 2024-05-10T00:05:00.000Z
11.25 | prod | 2024-05-10T00:13:00.000Z
11.0 | qa | 2024-05-10T00:07:00.000Z
;

avg_of_avg_over_time
required_capability: metrics_command
required_capability: avg_over_time
TS k8s | STATS avg_cost=avg(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT avg_cost DESC, time_bucket DESC, cluster | LIMIT 10;

avg_cost:double | cluster:keyword | time_bucket:datetime
11.625 | prod | 2024-05-10T00:12:00.000Z
10.6875 | prod | 2024-05-10T00:00:00.000Z
10.145833333333332 | qa | 2024-05-10T00:04:00.000Z
10.0 | staging | 2024-05-10T00:11:00.000Z
9.895833333333334 | qa | 2024-05-10T00:06:00.000Z
9.666666666666666 | prod | 2024-05-10T00:19:00.000Z
8.875 | qa | 2024-05-10T00:01:00.000Z
8.805555555555555 | qa | 2024-05-10T00:09:00.000Z
8.71875 | prod | 2024-05-10T00:22:00.000Z
8.5625 | qa | 2024-05-10T00:22:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ public enum Cap {
QUERY_MONITORING,

/**
* Support max_over_time aggregation
* Support max_over_time aggregation that gets evaluated per time-series
*/
MAX_OVER_TIME(Build.current().isSnapshot()),

Expand All @@ -974,7 +974,12 @@ public enum Cap {
/**
* Does the usage information for ESQL contain a histogram of {@code took} values?
*/
USAGE_CONTAINS_TOOK;
USAGE_CONTAINS_TOOK,

/**
* Support avg_over_time aggregation that gets evaluated per time-series
*/
AVG_OVER_TIME(Build.current().isSnapshot());

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.Check;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
Expand Down Expand Up @@ -434,6 +435,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(Delay.class, Delay::new, "delay"),
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(Term.class, bi(Term::new), "term") } };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Top.ENTRY,
Values.ENTRY,
MaxOverTime.ENTRY,
AvgOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field.
*/
public class AvgOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"AvgOverTime",
AvgOverTime::new
);

@FunctionInfo(returnType = "double", description = "The average over time of a numeric field.", type = FunctionType.AGGREGATE)
public AvgOverTime(
Source source,
@Param(
name = "number",
type = { "double", "integer", "long" },
description = "Expression that outputs values to average."
) Expression field
) {
this(source, field, Literal.TRUE);
}

public AvgOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private AvgOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
protected NodeInfo<AvgOverTime> info() {
return NodeInfo.create(this, AvgOverTime::new, field(), filter());
}

@Override
public AvgOverTime replaceChildren(List<Expression> newChildren) {
return new AvgOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
public AvgOverTime withFilter(Expression filter) {
return new AvgOverTime(source(), field(), filter);
}

@Override
public AggregateFunction perTimeSeriesAggregation() {
return new Avg(source(), field(), filter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,10 @@ protected static Batch<LogicalPlan> substitutions() {
new ReplaceAggregateAggExpressionWithEval(),
// lastly replace surrogate functions
new SubstituteSurrogates(),
// translate metric aggregates after surrogate substitution and replace nested expressions with eval (again)
new TranslateTimeSeriesAggregate(),
new PruneUnusedIndexMode(),
// after translating metric aggregates, we need to replace surrogate substitutions and nested expressions again.
new SubstituteSurrogates(),
new ReplaceAggregateNestedExpressionWithEval(),
// this one needs to be placed before ReplaceAliasingEvalWithProject, so that any potential aliasing eval (eval x = y)
// is not replaced with a Project before the eval to be copied on the left hand side of an InlineJoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(max_memory_usage) BY host_values, time_bucket
*
*
* TS k8s | STATS sum(avg_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute)
*
* becomes
*
* TS k8s
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
*
* </pre>
*/
public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7100,6 +7100,37 @@ public void testTranslateMaxOverTime() {
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
}

public void testTranslateAvgOverTime() {
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
var query = "TS k8s | STATS sum(avg_over_time(network.bytes_in)) BY bucket(@timestamp, 1h)";
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
Limit limit = as(plan, Limit.class);
Aggregate finalAgg = as(limit.child(), Aggregate.class);
assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
assertThat(finalAgg.aggregates(), hasSize(2));
Eval evalAvg = as(finalAgg.child(), Eval.class);
TimeSeriesAggregate aggsByTsid = as(evalAvg.child(), TimeSeriesAggregate.class);
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
assertNotNull(aggsByTsid.timeBucket());
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
assertThat(evalBucket.fields(), hasSize(1));
as(evalBucket.child(), EsRelation.class);

Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));
assertThat(finalAgg.groupings(), hasSize(1));
assertThat(Expressions.attribute(finalAgg.groupings().get(0)).id(), equalTo(aggsByTsid.aggregates().get(2).id()));

Sum sumTs = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sumTs.field()).name(), equalTo("network.bytes_in"));
Count countTs = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Count.class);
assertThat(Expressions.attribute(countTs.field()).name(), equalTo("network.bytes_in"));
assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
}

public void testMetricsWithoutRate() {
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
List<String> queries = List.of("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, max_over_time]
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time]
reason: "Test that should only be executed on snapshot versions"

- do: {xpack.usage: {}}
Expand Down Expand Up @@ -101,7 +101,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 135} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 136} # check the "sister" test below for a likely update to the same esql.functions length check

---
"Basic ESQL usage output (telemetry) non-snapshot version":
Expand Down