Skip to content

Introduce allow_partial_results setting in ES|QL #122890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122890.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122890
summary: Introduce `allow_partial_results` setting in ES|QL
area: ES|QL
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions test/external-modules/error-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
*/

apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'

tasks.named('javaRestTest') {
usesDefaultDistribution()
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
}

tasks.named('yamlRestTest').configure {
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.test.esql;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class EsqlPartialResultsIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("test-error-query")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("esql.query.allow_partial_results", "true")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public Set<String> populateIndices() throws Exception {
int nextId = 0;
{
createIndex("failing-index", Settings.EMPTY, """
{
"runtime": {
"fail_me": {
"type": "long",
"script": {
"source": "",
"lang": "failing_field"
}
}
},
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
Request doc = new Request("PUT", "failing-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}

}
Set<String> okIds = new HashSet<>();
{
createIndex("ok-index", Settings.EMPTY, """
{
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
okIds.add(id);
Request doc = new Request("PUT", "ok-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}
}
refresh(client(), "failing-index,ok-index");
return okIds;
}

public void testPartialResult() throws Exception {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
}
""";
// allow_partial_results = true
{
Request request = new Request("POST", "/_query");
request.setJsonEntity(query);
if (randomBoolean()) {
request.addParameter("allow_partial_results", "true");
}
Response resp = client().performRequest(request);
Map<String, Object> results = entityAsMap(resp);
assertThat(results.get("is_partial"), equalTo(true));
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
}
// allow_partial_results = false
{
Request request = new Request("POST", "/_query");
request.setJsonEntity("""
{
"query": "FROM ok-index,failing-index | LIMIT 100"
}
""");
request.addParameter("allow_partial_results", "false");
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
Response resp = error.getResponse();
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,74 @@

package org.elasticsearch.test.errorquery;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.singletonList;

/**
* Test plugin that exposes a way to simulate search shard failures and warnings.
*/
public class ErrorQueryPlugin extends Plugin implements SearchPlugin {
public class ErrorQueryPlugin extends Plugin implements SearchPlugin, ScriptPlugin {
public ErrorQueryPlugin() {}

@Override
public List<QuerySpec<?>> getQueries() {
return singletonList(new QuerySpec<>(ErrorQueryBuilder.NAME, ErrorQueryBuilder::new, p -> ErrorQueryBuilder.PARSER.parse(p, null)));
}

public static final String FAILING_FIELD_LANG = "failing_field";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return FAILING_FIELD_LANG;
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
throw new IllegalStateException("Accessing failing field");
}
};
}
};
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public final ActionType<Response> action() {

public abstract EsqlQueryRequestBuilder<Request, Response> filter(QueryBuilder filter);

public abstract EsqlQueryRequestBuilder<Request, Response> allowPartialResults(boolean allowPartialResults);

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static class RequestObjectBuilder {
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;
private Boolean allPartialResults = null;

public RequestObjectBuilder() throws IOException {
this(randomFrom(XContentType.values()));
Expand Down Expand Up @@ -204,6 +205,11 @@ public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException>
return this;
}

public RequestObjectBuilder allPartialResults(boolean allPartialResults) {
this.allPartialResults = allPartialResults;
return this;
}

public RequestObjectBuilder build() throws IOException {
if (isBuilt == false) {
if (tables != null) {
Expand Down Expand Up @@ -1151,6 +1157,9 @@ static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mod
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);
if (requestObject.allPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allPartialResults));
}

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -122,4 +124,46 @@ public void testPartialResults() throws Exception {
}
}
}

public void testDefaultPartialResults() throws Exception {
Set<String> okIds = populateIndices();
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), true))
);
try {
// allow_partial_results = default
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
if (randomBoolean()) {
request.allowPartialResults(true);
}
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
// allow_partial_results = false
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
request.allowPartialResults(false);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run(request).close());
assertThat(e.getMessage(), equalTo("Accessing failing field"));
}
} finally {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().putNull(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();
private boolean acceptedPragmaRisks = false;
private boolean allowPartialResults = false;
private Boolean allowPartialResults = null;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
Expand Down Expand Up @@ -232,12 +232,13 @@ public Map<String, Map<String, Column>> tables() {
return tables;
}

public boolean allowPartialResults() {
public Boolean allowPartialResults() {
return allowPartialResults;
}

public void allowPartialResults(boolean allowPartialResults) {
public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public EsqlQueryRequestBuilder keepOnCompletion(boolean keepOnCompletion) {
return this;
}

@Override
public EsqlQueryRequestBuilder allowPartialResults(boolean allowPartialResults) {
request.allowPartialResults(allowPartialResults);
return this;
}

static { // plumb access from x-pack core
SharedSecrets.setEsqlQueryRequestBuilderAccess(EsqlQueryRequestBuilder::newSyncEsqlQueryRequestBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ String fields() {
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");

private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
Expand Down Expand Up @@ -115,7 +114,6 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
}

private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
if (partialResults != null) {
esqlRequest.allowPartialResults(partialResults);
}
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

return channel -> {
Expand Down
Loading