Skip to content

Allow partial results by default in ES|QL - Take 2 #127351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 28, 2025
6 changes: 0 additions & 6 deletions docs/changelog/126286.yaml

This file was deleted.

16 changes: 16 additions & 0 deletions docs/changelog/127351.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pr: 127351
summary: Allow partial results by default in ES|QL
area: ES|QL
type: breaking
issues: [122802]

breaking:
title: Allow partial results by default in ES|QL
area: ES|QL
details: >-
In earlier versions of {es}, ES|QL would fail the entire query if it encountered any error. ES|QL now returns partial results instead of failing when encountering errors.

impact: >-
Callers should check the `is_partial` flag returned in the response to determine if the result is partial or complete. If returning partial results is not desired, this option can be overridden per request via an `allow_partial_results` parameter in the query URL or globally via the cluster setting `esql.query.allow_partial_results`.

notable: true
5 changes: 5 additions & 0 deletions docs/release-notes/breaking-changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ If you are migrating from a version prior to version 9.0, you must first upgrade

% ## Next version [elasticsearch-nextversion-breaking-changes]

## 9.1.0 [elasticsearch-910-breaking-changes]

ES|QL
: * Allow partial results by default in ES|QL [#125060](https://github.com/elastic/elasticsearch/pull/125060)

## 9.0.0 [elasticsearch-900-breaking-changes]

Aggregations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ static ElasticsearchCluster buildCluster() {
.module("test-esql-heap-attack")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("esql.query.allow_partial_results", "false")
.jvmArg("-Xmx512m");
String javaVersion = JvmInfo.jvmInfo().version();
if (javaVersion.equals("20") || javaVersion.equals("21")) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.replaceValueInMatch("Size", 49, "Test flamegraph from profiling-events")
task.replaceValueInMatch("Size", 49, "Test flamegraph from test-events")
task.skipTest("esql/90_non_indexed/fetch", "Temporary until backported")
task.skipTest("esql/63_enrich_int_range/Invalid age as double", "TODO: require disable allow_partial_results")
})

tasks.named('yamlRestCompatTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ private RestClient remoteClusterClient() throws IOException {

@Before
public void skipTestOnOldVersions() {
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_16_0));
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_19_0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
Expand Down Expand Up @@ -87,6 +88,12 @@ protected String from(String... indexName) {

@Override
public Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException {
if (requestObject.allowPartialResults() != null) {
assumeTrue(
"require allow_partial_results on local cluster",
clusterHasCapability("POST", "/_query", List.of(), List.of("support_partial_results")).orElse(false)
);
}
requestObject.includeCCSMetadata(true);
return super.runEsql(requestObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testInvalidPragma() throws IOException {
request.setJsonEntity("{\"f\":" + i + "}");
assertOK(client().performRequest(request));
}
RequestObjectBuilder builder = requestObjectBuilder().query("from test-index | limit 1 | keep f");
RequestObjectBuilder builder = requestObjectBuilder().query("from test-index | limit 1 | keep f").allowPartialResults(false);
builder.pragmas(Settings.builder().put("data_partitioning", "invalid-option").build());
ResponseException re = expectThrows(ResponseException.class, () -> runEsqlSync(builder));
assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("No enum constant"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static class RequestObjectBuilder {
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;
private Boolean allPartialResults = null;
private Boolean allowPartialResults = null;

public RequestObjectBuilder() throws IOException {
this(randomFrom(XContentType.values()));
Expand Down Expand Up @@ -209,11 +209,15 @@ public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException>
return this;
}

public RequestObjectBuilder allPartialResults(boolean allPartialResults) {
this.allPartialResults = allPartialResults;
public RequestObjectBuilder allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

public Boolean allowPartialResults() {
return allowPartialResults;
}

public RequestObjectBuilder build() throws IOException {
if (isBuilt == false) {
if (tables != null) {
Expand Down Expand Up @@ -1376,8 +1380,8 @@ protected static Request prepareRequestWithOptions(RequestObjectBuilder requestO
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);
if (requestObject.allPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allPartialResults));
if (requestObject.allowPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allowPartialResults));
}

RequestOptions.Builder options = request.getOptions().toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -76,6 +77,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return plugins;
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), false).build();
}

public static class InternalExchangePlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), EsqlPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), false)
.build();
}

protected void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
if (limit != null) {
assertAcked(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,20 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,7 +39,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class CrossClusterCancellationIT extends AbstractMultiClustersTestCase {
public class CrossClusterCancellationIT extends AbstractCrossClusterTestCase {
private static final String REMOTE_CLUSTER = "cluster-a";

@Override
Expand All @@ -53,35 +48,11 @@ protected List<String> remoteClusterAlias() {
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(InternalExchangePlugin.class);
plugins.add(SimplePauseFieldPlugin.class);
return plugins;
}

public static class InternalExchangePlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(
Setting.timeSetting(
ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING,
TimeValue.timeValueMillis(between(3000, 4000)),
Setting.Property.NodeScope
)
);
}
}

@Before
public void resetPlugin() {
SimplePauseFieldPlugin.resetPlugin();
}

@After
public void releasePlugin() {
SimplePauseFieldPlugin.release();
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 4000)))
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 5000)))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -17,7 +16,6 @@
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -90,18 +88,12 @@ void startComputeOnRemoteCluster(
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(DriverCompletionInfo.EMPTY);
} else if (configuration.allowPartialResults()
&& (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
executionInfo,
clusterAlias,
EsqlExecutionInfo.Cluster.Status.PARTIAL,
e
);
l.onResponse(DriverCompletionInfo.EMPTY);
} else {
l.onFailure(e);
}
} else if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(DriverCompletionInfo.EMPTY);
} else {
l.onFailure(e);
}
});
ExchangeService.openExchange(
transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
Expand All @@ -31,7 +30,6 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -63,6 +61,7 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;

import java.util.ArrayList;
Expand Down Expand Up @@ -433,8 +432,7 @@ public void executePlan(
);
dataNodesListener.onResponse(r.getCompletionInfo());
}, e -> {
if (configuration.allowPartialResults()
&& (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) {
execInfo.swapCluster(
LOCAL_CLUSTER,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {

public static final Setting<Boolean> QUERY_ALLOW_PARTIAL_RESULTS = Setting.boolSetting(
"esql.query.allow_partial_results",
false,
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -368,4 +370,16 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo,

return ExceptionsHelper.isRemoteUnavailableException(e);
}

/**
* Check whether this exception can be tolerated when partial results are on, or should be treated as fatal.
* @return true if the exception can be tolerated, false if it should be treated as fatal.
*/
public static boolean canAllowPartial(Exception e) {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof IndexNotFoundException || unwrapped instanceof ElasticsearchSecurityException) {
return false;
}
return true;
}
}
Loading