Skip to content

Include failures in partial response #124929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124929.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124929
summary: Include failures in partial response
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class EsqlPartialResultsIT extends ESRestTestCase {
Expand Down Expand Up @@ -97,6 +101,7 @@ public Set<String> populateIndices() throws Exception {
return okIds;
}

@SuppressWarnings("unchecked")
public void testPartialResult() throws Exception {
Set<String> okIds = populateIndices();
String query = """
Expand All @@ -113,11 +118,30 @@ public void testPartialResult() throws Exception {
}
Response resp = client().performRequest(request);
Map<String, Object> results = entityAsMap(resp);
logger.info("--> results {}", results);
assertThat(results.get("is_partial"), equalTo(true));
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
results,
"_clusters",
"details",
"(local)"
);
assertNotNull(localInfo);
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
assertThat(
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
);
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
assertThat(failures, hasSize(1));
assertThat(
failures.get(0).get("reason"),
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
);
}
// allow_partial_results = false
{
Expand All @@ -133,5 +157,81 @@ public void testPartialResult() throws Exception {
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
}

}

@SuppressWarnings("unchecked")
public void testFailureFromRemote() throws Exception {
setupRemoteClusters();
try {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
}
""";
// allow_partial_results = true
Request request = new Request("POST", "/_query");
request.setJsonEntity(query);
if (randomBoolean()) {
request.addParameter("allow_partial_results", "true");
}
Response resp = client().performRequest(request);
Map<String, Object> results = entityAsMap(resp);
logger.info("--> results {}", results);
assertThat(results.get("is_partial"), equalTo(true));
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
results,
"_clusters",
"details",
"cluster_one"
);
assertNotNull(remoteCluster);
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
assertThat(
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
);
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
assertThat(failures, hasSize(1));
assertThat(
failures.get(0).get("reason"),
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
);
} finally {
removeRemoteCluster();
}
}

private void setupRemoteClusters() throws IOException {
String settings = String.format(Locale.ROOT, """
{
"persistent": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [ "%s" ],
"skip_unavailable": false
}
}
}
}
}
""", cluster.getTransportEndpoints());
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity(settings);
client().performRequest(request);
}

private void removeRemoteCluster() throws IOException {
Request settingsRequest = new Request("PUT", "/_cluster/settings");
settingsRequest.setJsonEntity("""
{"persistent": { "cluster.*": null}}
""");
client().performRequest(settingsRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
Expand Down Expand Up @@ -37,11 +38,13 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase {

Expand Down Expand Up @@ -70,6 +73,14 @@ private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, C
assertClusterPartial(resp, clusterAlias, cluster.okShards + cluster.failingShards, cluster.okShards);
}

private void assertClusterFailure(EsqlQueryResponse resp, String clusterAlias, String reason) {
EsqlExecutionInfo.Cluster info = resp.getExecutionInfo().getCluster(clusterAlias);
assertThat(info.getFailures(), not(empty()));
for (ShardSearchFailure f : info.getFailures()) {
assertThat(f.reason(), containsString(reason));
}
}

private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, int totalShards, int okShards) {
EsqlExecutionInfo.Cluster clusterInfo = resp.getExecutionInfo().getCluster(clusterAlias);
assertThat(clusterInfo.getTotalShards(), equalTo(totalShards));
Expand All @@ -83,6 +94,7 @@ private void assertClusterSuccess(EsqlQueryResponse resp, String clusterAlias, i
assertThat(clusterInfo.getSuccessfulShards(), equalTo(numShards));
assertThat(clusterInfo.getFailedShards(), equalTo(0));
assertThat(clusterInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(clusterInfo.getFailures(), empty());
}

public void testPartialResults() throws Exception {
Expand Down Expand Up @@ -110,10 +122,12 @@ public void testPartialResults() throws Exception {
assertTrue(returnedIds.add(id));
assertThat(id, is(in(allIds)));
}

assertClusterPartial(resp, LOCAL_CLUSTER, local);
assertClusterPartial(resp, REMOTE_CLUSTER_1, remote1);
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2);
for (String cluster : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) {
assertClusterFailure(resp, cluster, "Accessing failing field");
}
}
}

Expand All @@ -139,6 +153,7 @@ public void testOneRemoteClusterPartial() throws Exception {
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0);
assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field");
}
}

Expand Down Expand Up @@ -191,9 +206,9 @@ public void sendResponse(Exception exception) {
}
assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds)));
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);

EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
}
} finally {
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
Expand Down Expand Up @@ -239,9 +254,9 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
}
assertThat(returnedIds, equalTo(local.okIds));
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);

EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
}
} finally {
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
Expand Down Expand Up @@ -286,8 +301,7 @@ public void testFailSearchShardsOnLocalCluster() throws Exception {
assertThat(returnedIds, equalTo(remote1.okIds));
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
assertClusterFailure(resp, LOCAL_CLUSTER, simulatedFailure.getMessage());
}
} finally {
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
Expand All @@ -26,9 +27,12 @@
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

/**
* Make sure the failures on the data node come back as failures over the wire.
Expand Down Expand Up @@ -121,6 +125,10 @@ public void testPartialResults() throws Exception {
assertThat(id, in(okIds));
assertTrue(actualIds.add(id));
}
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
assertThat(localInfo.getFailures(), not(empty()));
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
builder.setTook(executionInfo.tookSoFar());
}
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
if (executionInfo.isStopped() || resp.failedShards > 0) {
builder.setFailures(resp.failures);
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
Expand Down Expand Up @@ -251,7 +252,7 @@ void runComputeOnRemoteCluster(
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
final ComputeResponse r = finalResponse.get();
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards, r.failures);
}))) {
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.operator.DriverProfile;
Expand All @@ -29,9 +30,10 @@ final class ComputeResponse extends TransportResponse {
public final int successfulShards;
public final int skippedShards;
public final int failedShards;
public final List<ShardSearchFailure> failures;

ComputeResponse(List<DriverProfile> profiles) {
this(profiles, null, null, null, null, null);
this(profiles, null, null, null, null, null, List.of());
}

ComputeResponse(
Expand All @@ -40,14 +42,16 @@ final class ComputeResponse extends TransportResponse {
Integer totalShards,
Integer successfulShards,
Integer skippedShards,
Integer failedShards
Integer failedShards,
List<ShardSearchFailure> failures
) {
this.profiles = profiles;
this.took = took;
this.totalShards = totalShards == null ? 0 : totalShards.intValue();
this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
this.skippedShards = skippedShards == null ? 0 : skippedShards.intValue();
this.failedShards = failedShards == null ? 0 : failedShards.intValue();
this.failures = failures;
}

ComputeResponse(StreamInput in) throws IOException {
Expand All @@ -74,6 +78,11 @@ final class ComputeResponse extends TransportResponse {
this.skippedShards = 0;
this.failedShards = 0;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
this.failures = in.readCollectionAsImmutableList(ShardSearchFailure::readShardSearchFailure);
} else {
this.failures = List.of();
}
}

@Override
Expand All @@ -93,6 +102,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(skippedShards);
out.writeVInt(failedShards);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
out.writeCollection(failures, (o, v) -> v.writeTo(o));
}
}

public List<DriverProfile> getProfiles() {
Expand All @@ -118,4 +130,8 @@ public int getSkippedShards() {
public int getFailedShards() {
return failedShards;
}

public List<ShardSearchFailure> getFailures() {
return failures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public void execute(
.setSuccessfulShards(r.getSuccessfulShards())
.setSkippedShards(r.getSkippedShards())
.setFailedShards(r.getFailedShards())
.setFailures(r.failures)
.build()
);
dataNodesListener.onResponse(r.getProfiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void startComputeOnDataNodes(
new DataNodeRequestSender(
transportService,
esqlExecutor,
clusterAlias,
parentTask,
configuration.allowPartialResults(),
configuration.pragmas().maxConcurrentNodesPerCluster()
Expand Down
Loading