Skip to content

[8.x] ES|QL change point #126771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 15, 2025
5 changes: 5 additions & 0 deletions docs/changelog/120998.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120998
summary: ES|QL `change_point` processing command
area: Machine Learning
type: feature
issues: []
2 changes: 2 additions & 0 deletions docs/reference/esql/esql-commands.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ image::images/esql/processing-command.svg[A processing command changing an input

{esql} supports these processing commands:

* experimental:[] <<esql-change_point>>
* <<esql-dissect>>
* <<esql-drop>>
* <<esql-enrich>>
Expand All @@ -55,6 +56,7 @@ include::source-commands/from.asciidoc[]
include::source-commands/row.asciidoc[]
include::source-commands/show.asciidoc[]

include::processing-commands/change_point.asciidoc[]
include::processing-commands/dissect.asciidoc[]
include::processing-commands/drop.asciidoc[]
include::processing-commands/enrich.asciidoc[]
Expand Down
49 changes: 49 additions & 0 deletions docs/reference/esql/processing-commands/change_point.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[discrete]
[[esql-change_point]]
=== `CHANGE_POINT`

[NOTE]
====
The `CHANGE_POINT` command requires a https://www.elastic.co/subscriptions[platinum license].
====

preview::[]

`CHANGE_POINT` detects spikes, dips, and change points in a metric.

**Syntax**

[source,esql]
----
CHANGE_POINT value [ON key] [AS type_name, pvalue_name]
----

*Parameters*

`value`
: The column with the metric in which you want to detect a change point.

`key`
: The column with the key to order the values by. If not specified, `@timestamp` is used.

`type_name`
: The name of the output column with the change point type. If not specified, `type` is used.

`pvalue_name`
: The name of the output column with the p-value that indicates how extreme the change point is. If not specified, `pvalue` is used.

[NOTE]
====
There must be at least 22 values for change point detection. Fewer than 1,000 is preferred.
====

*Example*

[source.merge.styled,esql]
----
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs-result]
|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
* Find spikes, dips and change point in a list of values.
* <p>
* Warning: this operator cannot handle large amounts of data! It buffers all
* data that is passed to it, runs the change point detector on the data (which
* is a compute-heavy process), and then outputs all data with the change points.
*/
public class ChangePointOperator implements Operator {

public static final int INPUT_VALUE_COUNT_LIMIT = 1000;

public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
}

@Override
public String describe() {
return "ChangePointOperator[channel=" + channel + "]";
}
}

private final DriverContext driverContext;
private final int channel;
private final String sourceText;
private final int sourceLine;
private final int sourceColumn;

private final Deque<Page> inputPages;
private final Deque<Page> outputPages;
private boolean finished;
private Warnings warnings;

// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
// (by modularizing esql-core) and use that instead of the individual fields.
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
this.driverContext = driverContext;
this.channel = channel;
this.sourceText = sourceText;
this.sourceLine = sourceLine;
this.sourceColumn = sourceColumn;

finished = false;
inputPages = new LinkedList<>();
outputPages = new LinkedList<>();
warnings = null;
}

@Override
public boolean needsInput() {
return finished == false;
}

@Override
public void addInput(Page page) {
inputPages.add(page);
}

@Override
public void finish() {
if (finished == false) {
finished = true;
createOutputPages();
}
}

@Override
public boolean isFinished() {
return finished && outputPages.isEmpty();
}

@Override
public Page getOutput() {
if (finished == false || outputPages.isEmpty()) {
return null;
}
return outputPages.removeFirst();
}

private void createOutputPages() {
int valuesCount = 0;
for (Page page : inputPages) {
valuesCount += page.getPositionCount();
}
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
if (tooManyValues) {
valuesCount = INPUT_VALUE_COUNT_LIMIT;
}

List<Double> values = new ArrayList<>(valuesCount);
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
int valuesIndex = 0;
boolean hasNulls = false;
boolean hasMultivalued = false;
for (Page inputPage : inputPages) {
Block inputBlock = inputPage.getBlock(channel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
Object value = BlockUtils.toJavaObject(inputBlock, i);
if (value == null) {
hasNulls = true;
valuesIndex++;
} else if (value instanceof List<?>) {
hasMultivalued = true;
valuesIndex++;
} else {
values.add(((Number) value).doubleValue());
bucketIndexes.add(valuesIndex++);
}
}
}

MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
int changePointIndex = changeType.changePoint();

BlockFactory blockFactory = driverContext.blockFactory();
int pageStartIndex = 0;
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
Page outputPage;
Block changeTypeBlock = null;
Block changePvalueBlock = null;
boolean success = false;
try {
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
try (
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
) {
for (int i = 0; i < inputPage.getPositionCount(); i++) {
if (pageStartIndex + i == changePointIndex) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
pvalueBlockBuilder.appendDouble(changeType.pValue());
} else {
changeTypeBlockBuilder.appendNull();
pvalueBlockBuilder.appendNull();
}
}
changeTypeBlock = changeTypeBlockBuilder.build();
changePvalueBlock = pvalueBlockBuilder.build();
}
} else {
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
}

outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
success = true;
} finally {
if (success == false) {
Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock);
}
}

inputPages.removeFirst();
outputPages.add(outputPage);
pageStartIndex += inputPage.getPositionCount();
}

if (changeType instanceof ChangeType.Indeterminable indeterminable) {
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
}
if (tooManyValues) {
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasNulls) {
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
)
);
}
}

@Override
public void close() {
for (Page page : inputPages) {
page.releaseBlocks();
}
for (Page page : outputPages) {
page.releaseBlocks();
}
}

@Override
public String toString() {
return "ChangePointOperator[channel=" + channel + "]";
}

private Warnings warnings(boolean onlyWarnings) {
if (warnings == null) {
if (onlyWarnings) {
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
} else {
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
}
}
return warnings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public void registerException(Exception exception) {
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarningsTreatedAsNull
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
}

/**
Expand All @@ -50,7 +51,26 @@ public static Warnings createWarningsTreatedAsFalse(
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
}

/**
* Create a new warnings object based on the given mode which warns that
* evaluation resulted in warnings.
* @param warningsMode The warnings collection strategy to use
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarnings
public static Warnings createOnlyWarnings(
DriverContext.WarningsMode warningsMode,
int lineNumber,
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
}

private static Warnings createWarnings(
Expand Down Expand Up @@ -78,14 +98,7 @@ private static Warnings createWarnings(

private Warnings(int lineNumber, int columnNumber, String sourceText, String first) {
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
this.first = format(
null,
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
location,
sourceText,
first,
MAX_ADDED_WARNINGS
);
this.first = format(null, "{}" + first + ". Only first {} failures recorded.", location, sourceText, MAX_ADDED_WARNINGS);
}

public void registerException(Exception exception) {
Expand Down
Loading