diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index f2eca7aee058f..2d8a2e445ca7e 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -862,4 +862,27 @@ tasks.named('stringTemplates').configure { it.outputFile = "org/elasticsearch/xpack/compute/operator/lookup/EnrichResultBuilderForBoolean.java" } + // TODO: add last_over_time for other types: boolean, bytes_refs + File lastOverTimeAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st") + template { + it.properties = intProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java" + } + template { + it.properties = longProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java" + } + template { + it.properties = floatProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java" + } + template { + it.properties = doubleProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java" + } + } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java new file mode 100644 index 0000000000000..9231d345b7c84 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK") } +) +public class LastOverTimeDoubleAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, double value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + DoubleBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getDouble(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private DoubleArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newDoubleArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, double value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendDouble(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendDouble(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java new file mode 100644 index 0000000000000..f6d47c9b98ed6 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.FloatArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK") } +) +public class LastOverTimeFloatAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, float value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + FloatBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getFloat(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private FloatArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newFloatArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, float value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendFloat(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendFloat(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java new file mode 100644 index 0000000000000..8764a86d03a20 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK") } +) +public class LastOverTimeIntAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, int value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + IntBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getInt(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private IntArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newIntArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, int value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendInt(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendInt(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java new file mode 100644 index 0000000000000..94787db738bf2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK") } +) +public class LastOverTimeLongAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, long value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + LongBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getLong(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private LongArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newLongArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, long value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendLong(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendLong(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..b45b106373539 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeDoubleAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeDoubleAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeDoubleGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeDoubleGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return LastOverTimeDoubleGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of doubles"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..2ca6ab02875a2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeDoubleAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.DOUBLE) ); + + private final LastOverTimeDoubleAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeDoubleGroupingAggregatorFunction(List channels, + LastOverTimeDoubleAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeDoubleGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeDoubleGroupingAggregatorFunction(channels, LastOverTimeDoubleAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + DoubleBlock valuesBlock = page.getBlock(channels.get(0)); + DoubleVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeDoubleAggregator.GroupingState inState = ((LastOverTimeDoubleGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeDoubleAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeDoubleAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..17a714939a460 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeFloatAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeFloatAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeFloatGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeFloatGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeFloatGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of floats"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..38a3b23ee8cc5 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.FloatVector; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeFloatAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeFloatGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.FLOAT) ); + + private final LastOverTimeFloatAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeFloatGroupingAggregatorFunction(List channels, + LastOverTimeFloatAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeFloatGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeFloatGroupingAggregatorFunction(channels, LastOverTimeFloatAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + FloatBlock valuesBlock = page.getBlock(channels.get(0)); + FloatVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeFloatAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeFloatAggregator.GroupingState inState = ((LastOverTimeFloatGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeFloatAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeFloatAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..55230f932c681 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeIntAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeIntAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeIntGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeIntGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of ints"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..f03728a905ac3 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeIntAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeIntGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.INT) ); + + private final LastOverTimeIntAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeIntGroupingAggregatorFunction(List channels, + LastOverTimeIntAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeIntGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeIntGroupingAggregatorFunction(channels, LastOverTimeIntAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + IntBlock valuesBlock = page.getBlock(channels.get(0)); + IntVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeIntAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeIntAggregator.GroupingState inState = ((LastOverTimeIntGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeIntAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeIntAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..4223c27b21216 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeLongAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeLongAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeLongGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeLongGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of longs"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..c9ee5fbad3707 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeLongAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeLongGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.LONG) ); + + private final LastOverTimeLongAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeLongGroupingAggregatorFunction(List channels, + LastOverTimeLongAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeLongGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeLongGroupingAggregatorFunction(channels, LastOverTimeLongAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + LongBlock valuesBlock = page.getBlock(channels.get(0)); + LongVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeLongAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeLongAggregator.GroupingState inState = ((LastOverTimeLongGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeLongAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeLongAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st new file mode 100644 index 0000000000000..b189a83873dd7 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +$if(int||double||float)$ +import org.elasticsearch.common.util.$Type$Array; +$endif$ +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +$if(int||double||float)$ +import org.elasticsearch.compute.data.$Type$Block; +$endif$ +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") } +) +public class LastOverTime$Type$Aggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, $type$ value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + $Type$Block values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.get$Type$(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private $Type$Array values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.new$Type$Array(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, $type$ value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.append$Type$(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.append$Type$(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 4e701744cb749..cbccc0c61e416 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -219,3 +219,22 @@ avg_cost:double | cluster:keyword | time_bucket:datetime 8.71875 | prod | 2024-05-10T00:22:00.000Z 8.5625 | qa | 2024-05-10T00:22:00.000Z ; + + +max_of_last_over_time +required_capability: metrics_command +required_capability: last_over_time +TS k8s | STATS max_cost=max(last_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10; + +max_cost:double | cluster:keyword | time_bucket:datetime +12.5 | staging | 2024-05-10T00:09:00.000Z +12.375 | prod | 2024-05-10T00:17:00.000Z +12.375 | qa | 2024-05-10T00:06:00.000Z +12.375 | qa | 2024-05-10T00:01:00.000Z +12.25 | prod | 2024-05-10T00:19:00.000Z +12.125 | qa | 2024-05-10T00:17:00.000Z +12.125 | prod | 2024-05-10T00:00:00.000Z +12.0 | prod | 2024-05-10T00:08:00.000Z +12.0 | qa | 2024-05-10T00:08:00.000Z +11.875 | qa | 2024-05-10T00:21:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 7dd679e681798..cd46aa80b8ba5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1017,7 +1017,12 @@ public enum Cap { * During resolution (pre-analysis) we have to consider that joins or enriches can override EVALuated values * https://github.com/elastic/elasticsearch/issues/126419 */ - FIX_JOIN_MASKING_EVAL; + FIX_JOIN_MASKING_EVAL, + + /** + * Support last_over_time aggregation that gets evaluated per time-series + */ + LAST_OVER_TIME(Build.current().isSnapshot()); private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 82e9d6a313898..0bd4e4bda7c5b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; +import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Median; @@ -444,6 +445,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(Rate.class, Rate::withUnresolvedTimestamp, "rate"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), + def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"), def(Term.class, bi(Term::new), "term") } }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 0923dbc5d6853..aedd976b69762 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -32,6 +32,7 @@ public static List getNamedWriteables() { Values.ENTRY, MaxOverTime.ENTRY, AvgOverTime.ENTRY, + LastOverTime.ENTRY, // internal functions ToPartial.ENTRY, FromPartial.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java new file mode 100644 index 0000000000000..2c9645b035017 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeDoubleAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeFloatAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeIntAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeLongAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class LastOverTime extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "LastOverTime", + LastOverTime::new + ); + + private final Expression timestamp; + + @FunctionInfo( + returnType = { "int", "double", "integer", "long" }, + description = "Collect the most recent value of a time-series in the specified interval. Available with TS command only", + type = FunctionType.AGGREGATE + ) + public LastOverTime( + Source source, + @Param(name = "field", type = { "long|int|double|float" }, description = "field") Expression field, + Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp); + } + + // compatibility constructor used when reading from the stream + private LastOverTime(Source source, Expression field, Expression filter, List children) { + this(source, field, filter, children.getFirst()); + } + + private LastOverTime(Source source, Expression field, Expression filter, Expression timestamp) { + super(source, field, filter, List.of(timestamp)); + this.timestamp = timestamp; + } + + public LastOverTime(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public static LastOverTime withUnresolvedTimestamp(Source source, Expression field) { + return new LastOverTime(source, field, new UnresolvedAttribute(source, "@timestamp")); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LastOverTime::new, field(), timestamp); + } + + @Override + public LastOverTime replaceChildren(List newChildren) { + if (newChildren.size() != 3) { + assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; + throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); + } + return new LastOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + public LastOverTime withFilter(Expression filter) { + return new LastOverTime(source(), field(), filter, timestamp); + } + + @Override + public DataType dataType() { + return field().dataType(); + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long"); + } + + @Override + public AggregatorFunctionSupplier supplier() { + final DataType type = field().dataType(); + return switch (type) { + case LONG -> new LastOverTimeLongAggregatorFunctionSupplier(); + case INTEGER -> new LastOverTimeIntAggregatorFunctionSupplier(); + case DOUBLE -> new LastOverTimeDoubleAggregatorFunctionSupplier(); + case FLOAT -> new LastOverTimeFloatAggregatorFunctionSupplier(); + default -> throw EsqlIllegalArgumentException.illegalDataType(type); + }; + } + + @Override + public LastOverTime perTimeSeriesAggregation() { + return this; + } + + @Override + public String toString() { + return "last_over_time(" + field() + ")"; + } + + Expression timestamp() { + return timestamp; + } +} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 6f4bc45c018b3..40e93bafb8998 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -33,7 +33,7 @@ setup: path: /_query parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. - capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time] + capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time] reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} @@ -121,7 +121,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 137} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version":