Skip to content

Avoid sorted source for time_series aggs without rates #127033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext
public Query termQuery(Object value, SearchExecutionContext context) {
throw new IllegalArgumentException("[" + NAME + "] is not searchable");
}

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
}
}

private final boolean useDocValuesSkipper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
Expand Down Expand Up @@ -120,7 +122,7 @@
*
* becomes
*
* TS k8s
* FROM k8s
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(max_memory_usage) BY host_values, time_bucket
*
Expand All @@ -129,7 +131,7 @@
*
* becomes
*
* TS k8s
* FROM k8s
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
*
Expand All @@ -154,11 +156,15 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
List<NamedExpression> firstPassAggs = new ArrayList<>();
List<NamedExpression> secondPassAggs = new ArrayList<>();
Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
for (NamedExpression agg : aggregate.aggregates()) {
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
changed.set(Boolean.TRUE);
if (tsAgg instanceof Rate) {
hasRateAggregates.set(Boolean.TRUE);
}
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
Expand Down Expand Up @@ -231,16 +237,17 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
}
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
IndexMode indexMode = hasRateAggregates.get() ? r.indexMode() : IndexMode.STANDARD;
if (r.output().contains(tsid.get()) == false) {
return new EsRelation(
r.source(),
r.indexPattern(),
r.indexMode(),
indexMode,
r.indexNameWithModes(),
CollectionUtils.combine(r.output(), tsid.get())
);
} else {
return r;
return new EsRelation(r.source(), r.indexPattern(), indexMode, r.indexNameWithModes(), r.output());
}
});
final var firstPhase = new TimeSeriesAggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6829,7 +6829,8 @@ public void testTranslateMixedAggsWithMathWithoutGrouping() {
Eval addEval = as(aggsByTsid.child(), Eval.class);
assertThat(addEval.fields(), hasSize(1));
Add add = as(Alias.unwrap(addEval.fields().get(0)), Add.class);
as(addEval.child(), EsRelation.class);
EsRelation relation = as(addEval.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));

assertThat(Expressions.attribute(mul.left()).id(), equalTo(finalAggs.aggregates().get(1).id()));
assertThat(mul.right().fold(FoldContext.small()), equalTo(1.1));
Expand Down Expand Up @@ -6859,7 +6860,8 @@ public void testTranslateMetricsGroupedByOneDimension() {
TimeSeriesAggregate aggsByTsid = as(aggsByCluster.child(), TimeSeriesAggregate.class);
assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped
assertNull(aggsByTsid.timeBucket());
as(aggsByTsid.child(), EsRelation.class);
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));

Sum sum = as(Alias.unwrap(aggsByCluster.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
Expand All @@ -6886,7 +6888,8 @@ public void testTranslateMetricsGroupedByTwoDimension() {
TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class);
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
assertNull(aggsByTsid.timeBucket());
as(aggsByTsid.child(), EsRelation.class);
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));

Div div = as(Alias.unwrap(eval.fields().get(0)), Div.class);
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAggs.aggregates().get(0).id()));
Expand Down Expand Up @@ -6925,7 +6928,8 @@ public void testTranslateMetricsGroupedByTimeBucket() {
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
Eval eval = as(aggsByTsid.child(), Eval.class);
assertThat(eval.fields(), hasSize(1));
as(eval.child(), EsRelation.class);
EsRelation relation = as(eval.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));

Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
Expand Down Expand Up @@ -6959,7 +6963,8 @@ public void testTranslateMetricsGroupedByTimeBucketAndDimensions() {
assertNotNull(aggsByTsid.timeBucket());
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
Eval bucket = as(aggsByTsid.child(), Eval.class);
as(bucket.child(), EsRelation.class);
EsRelation relation = as(bucket.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));

Expand Down Expand Up @@ -7000,7 +7005,8 @@ public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
assertNotNull(aggsByTsid.timeBucket());
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
Eval bucket = as(aggsByTsid.child(), Eval.class);
as(bucket.child(), EsRelation.class);
EsRelation relation = as(bucket.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));

Expand Down Expand Up @@ -7064,7 +7070,8 @@ public void testAdjustMetricsRateBeforeFinalAgg() {
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
assertThat(evalBucket.fields(), hasSize(1));
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
as(evalBucket.child(), EsRelation.class);
EsRelation relation = as(evalBucket.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));

assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
Expand Down Expand Up @@ -7102,7 +7109,8 @@ public void testTranslateMaxOverTime() {
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
Eval eval = as(aggsByTsid.child(), Eval.class);
assertThat(eval.fields(), hasSize(1));
as(eval.child(), EsRelation.class);
EsRelation relation = as(eval.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));

Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
Expand Down Expand Up @@ -7131,7 +7139,8 @@ public void testTranslateAvgOverTime() {
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
assertThat(evalBucket.fields(), hasSize(1));
as(evalBucket.child(), EsRelation.class);
EsRelation relation = as(evalBucket.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));

Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));
Expand Down