Skip to content

Commit f9fd055

Browse files
jan-elasticelasticsearchmachinecraigtaverner
authored
[8.x] ES|QL change point (#126771)
* ES|QL change_point processing command (#120998) * Grammar for ES|QL change point (with dummy impl) * pipeline breaker * ChangePointOperator * Add sorting * basic csv test * conflict * Update docs/changelog/120998.yaml * [CI] Auto commit changes from spotless * polish * Non-long data type * Move OrderBy/Limit to the logical plan * fix mem.leak * csv test for reusing column names * Warning indeterminable * capability * handle null values * too much data * type text->keyword * default timestamp and output columns * spotless * ChangePointOperatorTests + fix memory leaks * [CI] Auto commit changes from spotless * improve test * add comments/todos * handle multivalued columns * don't register unserialiazable * surrogate * make "too much data" tests readable * more tests * Error handling * fix multivalued test * more name conflict tests * [CI] Auto commit changes from spotless * more tests * improve code * CSV test for various input key/value types * one more csv test * Check sortable/numeric for all types * add null type to testChangePoint_valueNumeric * more CSV tests * skip nulls instead of zeroing them * error on MV * Test+todo for nicer error message * better error msg * Revert "better error msg" This reverts commit 21ec77c. * fix * make csv test deterministic * replace NamedExpression -> Attribute * skip MVs + warning --------- Co-authored-by: elasticsearchmachine <[email protected]> * Move ES|QL change_point to tech preview * fix grammar * License check * docs + example * spotless * fix compile error * add change_point to ES|QL processing commands overview page * close note * close note * fix link --------- Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: Craig Taverner <[email protected]>
1 parent 445d626 commit f9fd055

File tree

32 files changed

+4640
-2383
lines changed

32 files changed

+4640
-2383
lines changed

docs/changelog/120998.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120998
2+
summary: ES|QL `change_point` processing command
3+
area: Machine Learning
4+
type: feature
5+
issues: []

docs/reference/esql/esql-commands.asciidoc

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ image::images/esql/processing-command.svg[A processing command changing an input
3232

3333
{esql} supports these processing commands:
3434

35+
* experimental:[] <<esql-change_point>>
3536
* <<esql-dissect>>
3637
* <<esql-drop>>
3738
* <<esql-enrich>>
@@ -55,6 +56,7 @@ include::source-commands/from.asciidoc[]
5556
include::source-commands/row.asciidoc[]
5657
include::source-commands/show.asciidoc[]
5758

59+
include::processing-commands/change_point.asciidoc[]
5860
include::processing-commands/dissect.asciidoc[]
5961
include::processing-commands/drop.asciidoc[]
6062
include::processing-commands/enrich.asciidoc[]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
[discrete]
2+
[[esql-change_point]]
3+
=== `CHANGE_POINT`
4+
5+
[NOTE]
6+
====
7+
The `CHANGE_POINT` command requires a https://www.elastic.co/subscriptions[platinum license].
8+
====
9+
10+
preview::[]
11+
12+
`CHANGE_POINT` detects spikes, dips, and change points in a metric.
13+
14+
**Syntax**
15+
16+
[source,esql]
17+
----
18+
CHANGE_POINT value [ON key] [AS type_name, pvalue_name]
19+
----
20+
21+
*Parameters*
22+
23+
`value`
24+
: The column with the metric in which you want to detect a change point.
25+
26+
`key`
27+
: The column with the key to order the values by. If not specified, `@timestamp` is used.
28+
29+
`type_name`
30+
: The name of the output column with the change point type. If not specified, `type` is used.
31+
32+
`pvalue_name`
33+
: The name of the output column with the p-value that indicates how extreme the change point is. If not specified, `pvalue` is used.
34+
35+
[NOTE]
36+
====
37+
There must be at least 22 values for change point detection. Fewer than 1,000 is preferred.
38+
====
39+
40+
*Example*
41+
42+
[source.merge.styled,esql]
43+
----
44+
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs]
45+
----
46+
[%header.monospaced.styled,format=dsv,separator=|]
47+
|===
48+
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs-result]
49+
|===
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.BlockUtils;
14+
import org.elasticsearch.compute.data.BytesRefBlock;
15+
import org.elasticsearch.compute.data.DoubleBlock;
16+
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.core.Releasables;
18+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
19+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
20+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;
21+
22+
import java.util.ArrayList;
23+
import java.util.Deque;
24+
import java.util.LinkedList;
25+
import java.util.List;
26+
27+
/**
28+
* Find spikes, dips and change point in a list of values.
29+
* <p>
30+
* Warning: this operator cannot handle large amounts of data! It buffers all
31+
* data that is passed to it, runs the change point detector on the data (which
32+
* is a compute-heavy process), and then outputs all data with the change points.
33+
*/
34+
public class ChangePointOperator implements Operator {
35+
36+
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
37+
38+
public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
39+
@Override
40+
public Operator get(DriverContext driverContext) {
41+
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
42+
}
43+
44+
@Override
45+
public String describe() {
46+
return "ChangePointOperator[channel=" + channel + "]";
47+
}
48+
}
49+
50+
private final DriverContext driverContext;
51+
private final int channel;
52+
private final String sourceText;
53+
private final int sourceLine;
54+
private final int sourceColumn;
55+
56+
private final Deque<Page> inputPages;
57+
private final Deque<Page> outputPages;
58+
private boolean finished;
59+
private Warnings warnings;
60+
61+
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
62+
// (by modularizing esql-core) and use that instead of the individual fields.
63+
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
64+
this.driverContext = driverContext;
65+
this.channel = channel;
66+
this.sourceText = sourceText;
67+
this.sourceLine = sourceLine;
68+
this.sourceColumn = sourceColumn;
69+
70+
finished = false;
71+
inputPages = new LinkedList<>();
72+
outputPages = new LinkedList<>();
73+
warnings = null;
74+
}
75+
76+
@Override
77+
public boolean needsInput() {
78+
return finished == false;
79+
}
80+
81+
@Override
82+
public void addInput(Page page) {
83+
inputPages.add(page);
84+
}
85+
86+
@Override
87+
public void finish() {
88+
if (finished == false) {
89+
finished = true;
90+
createOutputPages();
91+
}
92+
}
93+
94+
@Override
95+
public boolean isFinished() {
96+
return finished && outputPages.isEmpty();
97+
}
98+
99+
@Override
100+
public Page getOutput() {
101+
if (finished == false || outputPages.isEmpty()) {
102+
return null;
103+
}
104+
return outputPages.removeFirst();
105+
}
106+
107+
private void createOutputPages() {
108+
int valuesCount = 0;
109+
for (Page page : inputPages) {
110+
valuesCount += page.getPositionCount();
111+
}
112+
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
113+
if (tooManyValues) {
114+
valuesCount = INPUT_VALUE_COUNT_LIMIT;
115+
}
116+
117+
List<Double> values = new ArrayList<>(valuesCount);
118+
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
119+
int valuesIndex = 0;
120+
boolean hasNulls = false;
121+
boolean hasMultivalued = false;
122+
for (Page inputPage : inputPages) {
123+
Block inputBlock = inputPage.getBlock(channel);
124+
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
125+
Object value = BlockUtils.toJavaObject(inputBlock, i);
126+
if (value == null) {
127+
hasNulls = true;
128+
valuesIndex++;
129+
} else if (value instanceof List<?>) {
130+
hasMultivalued = true;
131+
valuesIndex++;
132+
} else {
133+
values.add(((Number) value).doubleValue());
134+
bucketIndexes.add(valuesIndex++);
135+
}
136+
}
137+
}
138+
139+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
140+
null,
141+
values.stream().mapToDouble(Double::doubleValue).toArray(),
142+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
143+
);
144+
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
145+
int changePointIndex = changeType.changePoint();
146+
147+
BlockFactory blockFactory = driverContext.blockFactory();
148+
int pageStartIndex = 0;
149+
while (inputPages.isEmpty() == false) {
150+
Page inputPage = inputPages.peek();
151+
Page outputPage;
152+
Block changeTypeBlock = null;
153+
Block changePvalueBlock = null;
154+
boolean success = false;
155+
try {
156+
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
157+
try (
158+
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
159+
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
160+
) {
161+
for (int i = 0; i < inputPage.getPositionCount(); i++) {
162+
if (pageStartIndex + i == changePointIndex) {
163+
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
164+
pvalueBlockBuilder.appendDouble(changeType.pValue());
165+
} else {
166+
changeTypeBlockBuilder.appendNull();
167+
pvalueBlockBuilder.appendNull();
168+
}
169+
}
170+
changeTypeBlock = changeTypeBlockBuilder.build();
171+
changePvalueBlock = pvalueBlockBuilder.build();
172+
}
173+
} else {
174+
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
175+
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
176+
}
177+
178+
outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
179+
success = true;
180+
} finally {
181+
if (success == false) {
182+
Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock);
183+
}
184+
}
185+
186+
inputPages.removeFirst();
187+
outputPages.add(outputPage);
188+
pageStartIndex += inputPage.getPositionCount();
189+
}
190+
191+
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
192+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
193+
}
194+
if (tooManyValues) {
195+
warnings(true).registerException(
196+
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197+
);
198+
}
199+
if (hasNulls) {
200+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201+
}
202+
if (hasMultivalued) {
203+
warnings(true).registerException(
204+
new IllegalArgumentException(
205+
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+
)
207+
);
208+
}
209+
}
210+
211+
@Override
212+
public void close() {
213+
for (Page page : inputPages) {
214+
page.releaseBlocks();
215+
}
216+
for (Page page : outputPages) {
217+
page.releaseBlocks();
218+
}
219+
}
220+
221+
@Override
222+
public String toString() {
223+
return "ChangePointOperator[channel=" + channel + "]";
224+
}
225+
226+
private Warnings warnings(boolean onlyWarnings) {
227+
if (warnings == null) {
228+
if (onlyWarnings) {
229+
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
230+
} else {
231+
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
232+
}
233+
}
234+
return warnings;
235+
}
236+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Warnings.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ public void registerException(Exception exception) {
3131
* @param sourceText The source text that caused the warning. Same as `source.text()`
3232
* @return A warnings collector object
3333
*/
34+
// TODO: rename to createWarningsTreatedAsNull
3435
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
35-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
36+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
3637
}
3738

3839
/**
@@ -50,7 +51,26 @@ public static Warnings createWarningsTreatedAsFalse(
5051
int columnNumber,
5152
String sourceText
5253
) {
53-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
54+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
55+
}
56+
57+
/**
58+
* Create a new warnings object based on the given mode which warns that
59+
* evaluation resulted in warnings.
60+
* @param warningsMode The warnings collection strategy to use
61+
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
62+
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
63+
* @param sourceText The source text that caused the warning. Same as `source.text()`
64+
* @return A warnings collector object
65+
*/
66+
// TODO: rename to createWarnings
67+
public static Warnings createOnlyWarnings(
68+
DriverContext.WarningsMode warningsMode,
69+
int lineNumber,
70+
int columnNumber,
71+
String sourceText
72+
) {
73+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
5474
}
5575

5676
private static Warnings createWarnings(
@@ -78,14 +98,7 @@ private static Warnings createWarnings(
7898

7999
private Warnings(int lineNumber, int columnNumber, String sourceText, String first) {
80100
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
81-
this.first = format(
82-
null,
83-
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
84-
location,
85-
sourceText,
86-
first,
87-
MAX_ADDED_WARNINGS
88-
);
101+
this.first = format(null, "{}" + first + ". Only first {} failures recorded.", location, sourceText, MAX_ADDED_WARNINGS);
89102
}
90103

91104
public void registerException(Exception exception) {

0 commit comments

Comments
 (0)