Skip to content

Introduce allow_partial_results setting in ES|QL #122890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add allow_partial_results cluster settings in ES|QL
  • Loading branch information
dnhatn committed Mar 4, 2025
commit e8873ce08d0e005b042d5ed411dc5e6abda323f7
25 changes: 25 additions & 0 deletions test/external-modules/failing-field/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-java-rest-test'


tasks.named('javaRestTest') {
usesDefaultDistribution()
it.onlyIf("snapshot build") { buildParams.isSnapshotBuild() }
}

dependencies {
api project(':test:framework')
}

esplugin {
description = 'A test module that includes runtime fields which throw exceptions when accessed'
classname ='org.elasticsearch.test.FailingFieldPlugin'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.test.failingfield;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class EsqlPartialResultsIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("test-failing-field")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("esql.query.allow_partial_results", "true")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public Set<String> populateIndices() throws Exception {
int nextId = 0;
{
createIndex("failing-index", Settings.EMPTY, """
{
"runtime": {
"fail_me": {
"type": "long",
"script": {
"source": "",
"lang": "failing_field"
}
}
},
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
Request doc = new Request("PUT", "failing-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}

}
Set<String> okIds = new HashSet<>();
{
createIndex("ok-index", Settings.EMPTY, """
{
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
okIds.add(id);
Request doc = new Request("PUT", "ok-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}
}
refresh(client(), "failing-index,ok-index");
return okIds;
}

public void testPartialResult() throws Exception {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
}
""";
// allow_partial_results = true
{
Request request = new Request("POST", "/_query");
request.setJsonEntity(query);
if (randomBoolean()) {
request.addParameter("allow_partial_results", "true");
}
Response resp = client().performRequest(request);
Map<String, Object> results = entityAsMap(resp);
assertThat(results.get("is_partial"), equalTo(true));
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(
Map.of("name", "fail_me", "type", "long"),
Map.of("name", "v", "type", "long")
)));
List<?> values = (List<?>) results.get("values");
assertThat(values, hasSize(okIds.size()));
}
// allow_partial_results = false
{
Request request = new Request("POST", "/_query");
request.setJsonEntity("""
{
"query": "FROM ok-index,failing-index | LIMIT 100"
}
""");
request.addParameter("allow_partial_results", "false");
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
Response resp = error.getResponse();
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public final ActionType<Response> action() {

public abstract EsqlQueryRequestBuilder<Request, Response> filter(QueryBuilder filter);

public abstract EsqlQueryRequestBuilder<Request, Response> allowPartialResults(boolean allowPartialResults);

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static class RequestObjectBuilder {
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;
private Boolean allPartialResults = null;

public RequestObjectBuilder() throws IOException {
this(randomFrom(XContentType.values()));
Expand Down Expand Up @@ -204,6 +205,11 @@ public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException>
return this;
}

public RequestObjectBuilder allPartialResults(boolean allPartialResults) {
this.allPartialResults = allPartialResults;
return this;
}

public RequestObjectBuilder build() throws IOException {
if (isBuilt == false) {
if (tables != null) {
Expand Down Expand Up @@ -1151,6 +1157,9 @@ static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mod
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);
if (requestObject.allPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allPartialResults));
}

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -122,4 +124,46 @@ public void testPartialResults() throws Exception {
}
}
}

public void testDefaultPartialResults() throws Exception {
Set<String> okIds = populateIndices();
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), true))
);
try {
// allow_partial_results = default
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
if (randomBoolean()) {
request.allowPartialResults(true);
}
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
// allow_partial_results = false
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
request.allowPartialResults(false);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run(request).close());
assertThat(e.getMessage(), equalTo("Accessing failing field"));
}
} finally {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().putNull(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();
private boolean acceptedPragmaRisks = false;
private boolean allowPartialResults = false;
private Boolean allowPartialResults = null;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
Expand Down Expand Up @@ -232,12 +232,13 @@ public Map<String, Map<String, Column>> tables() {
return tables;
}

public boolean allowPartialResults() {
public Boolean allowPartialResults() {
return allowPartialResults;
}

public void allowPartialResults(boolean allowPartialResults) {
public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public EsqlQueryRequestBuilder keepOnCompletion(boolean keepOnCompletion) {
return this;
}

@Override
public EsqlQueryRequestBuilder allowPartialResults(boolean allowPartialResults) {
request.allowPartialResults(allowPartialResults);
return this;
}

static { // plumb access from x-pack core
SharedSecrets.setEsqlQueryRequestBuilderAccess(EsqlQueryRequestBuilder::newSyncEsqlQueryRequestBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ String fields() {
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");

private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
Expand Down Expand Up @@ -115,7 +114,6 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
}

private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
if (partialResults != null) {
esqlRequest.allowPartialResults(partialResults);
}
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
Setting.Property.Dynamic
);

public static final Setting<Boolean> QUERY_ALLOW_PARTIAL_RESULTS = Setting.boolSetting(
"esql.query.allow_partial_results",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

@Override
public Collection<?> createComponents(PluginServices services) {
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
Expand Down Expand Up @@ -151,7 +158,7 @@ protected XPackLicenseState getLicenseState() {
*/
@Override
public List<Setting<?>> getSettings() {
return List.of(QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, QUERY_RESULT_TRUNCATION_MAX_SIZE);
return List.of(QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, QUERY_RESULT_TRUNCATION_MAX_SIZE, QUERY_ALLOW_PARTIAL_RESULTS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final RemoteClusterService remoteClusterService;
private final UsageService usageService;
private final TransportActionServices services;
private volatile boolean defaultAllowPartialResults;

@Inject
@SuppressWarnings("this-escape")
Expand Down Expand Up @@ -158,6 +159,9 @@ public TransportEsqlQueryAction(
indexNameExpressionResolver,
usageService
);
defaultAllowPartialResults = EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS, v -> defaultAllowPartialResults = v);
}

@Override
Expand Down Expand Up @@ -194,6 +198,9 @@ public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener
}

private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
if (request.allowPartialResults() == null) {
request.allowPartialResults(defaultAllowPartialResults);
}
Configuration configuration = new Configuration(
ZoneOffset.UTC,
request.locale() != null ? request.locale() : Locale.US,
Expand Down