Skip to content

ES|QL random sampling #125570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d0cb643
Add a random sample commadn
bpintea Mar 3, 2025
ab4934c
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 3, 2025
e03dc15
Add non-operator-related tests
bpintea Mar 5, 2025
450f450
Make seed parameter optional. Various fixes
bpintea Mar 5, 2025
7a741a4
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 5, 2025
4c4fc1f
Make CsvTests more node-count-induced variation tollerant
bpintea Mar 6, 2025
1c56fb7
Simplify RandomSampleOperator
jan-elastic Mar 25, 2025
99a4a70
correct aggregations for random sampling
jan-elastic Mar 18, 2025
1598b80
Update docs/changelog/125570.yaml
jan-elastic Mar 25, 2025
6acf60b
don't correct multiple stats
jan-elastic Mar 26, 2025
23b5a3c
Refactor sample correction
jan-elastic Mar 26, 2025
b5cde58
Refactor sample correction once more
jan-elastic Mar 26, 2025
ec51f8c
spotless
jan-elastic Mar 26, 2025
67a0c44
fix random sample csv tests
jan-elastic Mar 26, 2025
878f09c
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 27, 2025
4bfe076
Make `isSampledCorrected` field final
jan-elastic Mar 27, 2025
aedc2f4
fix StatementParserTests
jan-elastic Mar 27, 2025
f40efb3
make sample corrected constructor private
jan-elastic Mar 27, 2025
473217d
push down through where and sort
jan-elastic Mar 27, 2025
92949fa
rename RANDOM_SAMPLE -> SAMPLE
jan-elastic Mar 27, 2025
e449ae4
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 27, 2025
83c0e07
SampleOperaetorTests + fix status
jan-elastic Mar 28, 2025
c98391a
Test accuracy of sampling operator
jan-elastic Mar 31, 2025
0e5d497
polish code
jan-elastic Mar 31, 2025
2e32a72
error on seed in sampling operator
jan-elastic Apr 1, 2025
cad45d7
Don't push sample correction through limit
jan-elastic Apr 8, 2025
01cc677
Don't push sample correction through mv_expand
jan-elastic Apr 8, 2025
011e612
CSV tests
jan-elastic Apr 3, 2025
27f4294
propagate multiple sample probabilities
jan-elastic Apr 9, 2025
875c2d5
REST test
jan-elastic Apr 9, 2025
eb1f728
enable all csv tests
jan-elastic Apr 9, 2025
ccc7179
Fix CSV test with sample+limit
jan-elastic Apr 10, 2025
a5ef3bd
add SampleBreaking interface
jan-elastic Apr 10, 2025
ac41e9c
comments
jan-elastic Apr 15, 2025
12df3c2
linkedlist -> arraydeque for efficiencyu
jan-elastic Apr 15, 2025
6051ec0
use samplebreaking in pushdown
jan-elastic Apr 15, 2025
c7cbf7e
different operator categories wrt sampling. Remove SampleBreaking int…
jan-elastic Apr 15, 2025
0a5085b
sample metrics
jan-elastic Apr 23, 2025
ba917cd
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 23, 2025
29fbc39
fix esql metrics test
jan-elastic Apr 23, 2025
af95b37
delete unused file
jan-elastic Apr 23, 2025
f13a5c9
Merge branch 'main' into feat/random_sample
jan-elastic Apr 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
error on seed in sampling operator
  • Loading branch information
jan-elastic committed Apr 23, 2025
commit 2e32a72d1db31f36057f34fba0bd89c121f2ecaf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.capabilities;

import org.elasticsearch.xpack.esql.common.Failures;

/**
* Interface implemented by expressions that require validation post physical optimization.
*/
public interface PostPhysicalOptimizationVerificationAware {

/**
* Validates the implementing expression - discovered failures are reported to the given
* {@link Failures} class.
*/
void postPhysicalOptimizationVerification(Failures failures);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
Expand All @@ -24,7 +24,6 @@
import org.elasticsearch.xpack.esql.rule.Rule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
Expand All @@ -46,8 +45,8 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan plan) {
Collection<Failure> failures = verifier.verify(plan);
if (failures.isEmpty() == false) {
Failures failures = verifier.verify(plan);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;

import java.util.Collection;
import java.util.List;

/**
Expand All @@ -39,8 +38,8 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
}

PhysicalPlan verify(PhysicalPlan plan) {
Collection<Failure> failures = verifier.verify(plan);
if (failures.isEmpty() == false) {
Failures failures = verifier.verify(plan);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
Expand All @@ -17,10 +17,6 @@
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;

import static org.elasticsearch.xpack.esql.common.Failure.fail;

/** Physical plan verifier. */
Expand All @@ -31,8 +27,8 @@ public final class PhysicalVerifier {
private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Collection<Failure> verify(PhysicalPlan plan) {
Set<Failure> failures = new LinkedHashSet<>();
public Failures verify(PhysicalPlan plan) {
Failures failures = new Failures();
Failures depFailures = new Failures();

// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
Expand All @@ -56,6 +52,17 @@ public Collection<Failure> verify(PhysicalPlan plan) {
}
}
PlanConsistencyChecker.checkPlan(p, depFailures);

if (failures.hasFailures() == false) {
if (p instanceof PostPhysicalOptimizationVerificationAware va) {
va.postPhysicalOptimizationVerification(failures);
}
p.forEachExpression(ex -> {
if (ex instanceof PostPhysicalOptimizationVerificationAware va) {
va.postPhysicalOptimizationVerification(failures);
}
});
}
});

if (depFailures.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand All @@ -19,7 +21,9 @@
import java.io.IOException;
import java.util.Objects;

public class SampleExec extends UnaryExec {
import static org.elasticsearch.xpack.esql.common.Failure.fail;

public class SampleExec extends UnaryExec implements PostPhysicalOptimizationVerificationAware {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"SampleExec",
Expand Down Expand Up @@ -96,4 +100,15 @@ public boolean equals(Object obj) {

return Objects.equals(child(), other.child()) && Objects.equals(probability, other.probability) && Objects.equals(seed, other.seed);
}

@Override
public void postPhysicalOptimizationVerification(Failures failures) {
// It's currently impossible in ES|QL to handle all data in deterministic order, therefore
// a fixed random seed in the sample operator doesn't work as intended and is disallowed.
// TODO: fix this.
if (seed != null) {
// TODO: what should the error message here be? This doesn't seem right.
failures.add(fail(seed, "Seed not supported when sampling can't be pushed down to Lucene"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7793,6 +7793,8 @@ public void testPruneRedundantOrderBy() {
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
*/
public void testSampleMerged() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var query = """
FROM TEST
| SAMPLE .3 5
Expand All @@ -7813,6 +7815,8 @@ public void testSampleMerged() {
}

public void testSamplePushDown() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

for (var command : List.of(
"ENRICH languages_idx on first_name",
"EVAL x = 1",
Expand All @@ -7837,6 +7841,8 @@ public void testSamplePushDown() {
}

public void testSamplePushDown_sort() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var query = "FROM TEST | WHERE emp_no > 0 | SAMPLE 0.5 | LIMIT 100";
var optimized = optimizedPlan(query);

Expand All @@ -7850,6 +7856,8 @@ public void testSamplePushDown_sort() {
}

public void testSamplePushDown_where() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var query = "FROM TEST | SORT emp_no | SAMPLE 0.5 | LIMIT 100";
var optimized = optimizedPlan(query);

Expand All @@ -7862,6 +7870,8 @@ public void testSamplePushDown_where() {
}

public void testSampleNoPushDown() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

for (var command : List.of("LIMIT 100", "MV_EXPAND languages", "STATS COUNT()")) {
var query = "FROM TEST | " + command + " | SAMPLE .5";
var optimized = optimizedPlan(query);
Expand All @@ -7881,7 +7891,9 @@ public void testSampleNoPushDown() {
* | \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#17, language_name{f}#18]
*/
public void testRandomSampleNoPushDownLookupJoin() {
public void testSampleNoPushDownLookupJoin() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var query = """
FROM TEST
| EVAL language_code = emp_no
Expand All @@ -7906,6 +7918,8 @@ public void testRandomSampleNoPushDownLookupJoin() {
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
*/
public void testSampleNoPushDownChangePoint() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var query = """
FROM TEST
| CHANGE_POINT emp_no ON hire_date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7839,7 +7839,9 @@ public void testEqualsPushdownToDelegateTooBig() {
* query[{"bool":{"filter":[{"sampling":{"probability":0.1,"seed":234,"hash":0}}],"boost":1.0}}]
* [_doc{f}#24], limit[1000], sort[] estimatedRowSize[332]
*/
public void testRandomSamplePushDown() {
public void testSamplePushDown() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

var plan = physicalPlan("""
FROM test
| SAMPLE +0.1 -234
Expand All @@ -7860,6 +7862,20 @@ public void testRandomSamplePushDown() {
assertThat(randomSampling.hash(), equalTo(0));
}

public void testSample_seedNotSupportedInOperator() {
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());

optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1"));
optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1 42"));
optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1"));

VerificationException e = expectThrows(
VerificationException.class,
() -> optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1 42"))
);
assertThat(e.getMessage(), equalTo("Found 1 problem\nline 1:47: Seed not supported when sampling can't be pushed down to Lucene"));
}

@SuppressWarnings("SameParameterValue")
private static void assertFilterCondition(
Filter filter,
Expand Down Expand Up @@ -8043,7 +8059,7 @@ private PhysicalPlan physicalPlan(String query, TestDataSource dataSource, boole
var logical = logicalOptimizer.optimize(dataSource.analyzer.analyze(parser.createStatement(query)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
// System.out.println(physical);
// System.out.println("Physical\n" + physical);
if (assertSerialization) {
assertSerialization(physical);
}
Expand Down