Skip to content

Commit b71ca6b

Browse files
committed
Return failures in partial response
1 parent e076653 commit b71ca6b

File tree

7 files changed

+31
-36
lines changed

7 files changed

+31
-36
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

-24
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16-
import java.util.ArrayList;
1716
import java.util.EnumMap;
1817
import java.util.List;
1918
import java.util.Map;
@@ -137,27 +136,4 @@ private Exception buildFailure() {
137136
assert first != null;
138137
return first;
139138
}
140-
141-
public List<Exception> getFailures() {
142-
if (hasFailure == false) {
143-
return List.of();
144-
}
145-
synchronized (this) {
146-
List<Exception> failures = new ArrayList<>();
147-
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE)) {
148-
for (Exception e : categories.get(category)) {
149-
if (failures.size() <= maxExceptions) {
150-
failures.add(e);
151-
}
152-
}
153-
}
154-
if (failures.isEmpty()) {
155-
Exception any = categories.get(Category.CANCELLATION).poll();
156-
if (any != null) {
157-
failures.add(any);
158-
}
159-
}
160-
return failures;
161-
}
162-
}
163139
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
163163
builder.setTook(executionInfo.tookSoFar());
164164
}
165165
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
166-
if (executionInfo.isStopped() || resp.failedShards > 0) {
166+
builder.setFailures(resp.failures);
167+
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
167168
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
168169
} else {
169170
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.compute.operator.DriverProfile;
@@ -29,7 +30,7 @@ final class ComputeResponse extends TransportResponse {
2930
public final int successfulShards;
3031
public final int skippedShards;
3132
public final int failedShards;
32-
public final List<Exception> failures;
33+
public final List<ShardSearchFailure> failures;
3334

3435
ComputeResponse(List<DriverProfile> profiles) {
3536
this(profiles, null, null, null, null, null, List.of());
@@ -42,7 +43,7 @@ final class ComputeResponse extends TransportResponse {
4243
Integer successfulShards,
4344
Integer skippedShards,
4445
Integer failedShards,
45-
List<Exception> failures
46+
List<ShardSearchFailure> failures
4647
) {
4748
this.profiles = profiles;
4849
this.took = took;
@@ -78,7 +79,7 @@ final class ComputeResponse extends TransportResponse {
7879
this.failedShards = 0;
7980
}
8081
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
81-
this.failures = in.readCollectionAsImmutableList(StreamInput::readException);
82+
this.failures = in.readCollectionAsImmutableList(ShardSearchFailure::readShardSearchFailure);
8283
} else {
8384
this.failures = List.of();
8485
}
@@ -102,7 +103,7 @@ public void writeTo(StreamOutput out) throws IOException {
102103
out.writeVInt(failedShards);
103104
}
104105
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
105-
out.writeCollection(failures, StreamOutput::writeException);
106+
out.writeCollection(failures, (o, v) -> v.writeTo(o));
106107
}
107108
}
108109

@@ -130,7 +131,7 @@ public int getFailedShards() {
130131
return failedShards;
131132
}
132133

133-
public List<Exception> getFailures() {
134+
public List<ShardSearchFailure> getFailures() {
134135
return failures;
135136
}
136137
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

+1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public void execute(
268268
.setSuccessfulShards(r.getSuccessfulShards())
269269
.setSkippedShards(r.getSkippedShards())
270270
.setFailedShards(r.getFailedShards())
271+
.setFailures(r.failures)
271272
.build()
272273
);
273274
dataNodesListener.onResponse(r.getProfiles());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ void startComputeOnDataNodes(
101101
new DataNodeRequestSender(
102102
transportService,
103103
esqlExecutor,
104+
clusterAlias,
104105
parentTask,
105106
configuration.allowPartialResults(),
106107
configuration.pragmas().maxConcurrentNodesPerCluster()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.search.SearchShardsGroup;
1616
import org.elasticsearch.action.search.SearchShardsRequest;
1717
import org.elasticsearch.action.search.SearchShardsResponse;
18+
import org.elasticsearch.action.search.ShardSearchFailure;
1819
import org.elasticsearch.action.support.TransportActions;
1920
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.index.Index;
2728
import org.elasticsearch.index.query.QueryBuilder;
2829
import org.elasticsearch.index.shard.ShardId;
30+
import org.elasticsearch.search.SearchShardTarget;
2931
import org.elasticsearch.search.internal.AliasFilter;
3032
import org.elasticsearch.tasks.CancellableTask;
3133
import org.elasticsearch.tasks.Task;
@@ -73,6 +75,7 @@ abstract class DataNodeRequestSender {
7375

7476
private final TransportService transportService;
7577
private final Executor esqlExecutor;
78+
private final String clusterAlias;
7679
private final CancellableTask rootTask;
7780
private final boolean allowPartialResults;
7881
private final Semaphore concurrentRequests;
@@ -87,12 +90,14 @@ abstract class DataNodeRequestSender {
8790
DataNodeRequestSender(
8891
TransportService transportService,
8992
Executor esqlExecutor,
93+
String clusterAlias,
9094
CancellableTask rootTask,
9195
boolean allowPartialResults,
9296
int concurrentRequests
9397
) {
9498
this.transportService = transportService;
9599
this.esqlExecutor = esqlExecutor;
100+
this.clusterAlias = clusterAlias;
96101
this.rootTask = rootTask;
97102
this.allowPartialResults = allowPartialResults;
98103
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
@@ -209,16 +214,25 @@ private void reportFailures(ComputeListener computeListener) {
209214
}
210215
}
211216

212-
private List<Exception> selectFailures() {
217+
private List<ShardSearchFailure> selectFailures() {
213218
assert reportedFailure == false;
214-
FailureCollector collector = new FailureCollector();
219+
List<ShardSearchFailure> failures = new ArrayList<>();
215220
Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
216-
for (ShardFailure e : shardFailures.values()) {
217-
if (seen.add(e.failure)) {
218-
collector.unwrapAndCollect(e.failure);
221+
for (Map.Entry<ShardId, ShardFailure> e : shardFailures.entrySet()) {
222+
ShardFailure failure = e.getValue();
223+
if (ExceptionsHelper.unwrap(failure.failure(), TaskCancelledException.class) != null) {
224+
continue;
219225
}
226+
if (seen.add(failure.failure) && failures.size() < 5) {
227+
failures.add(new ShardSearchFailure(failure.failure, new SearchShardTarget(null, e.getKey(), clusterAlias)));
228+
}
229+
}
230+
// pick any cancellation
231+
if (failures.isEmpty() && shardFailures.isEmpty() == false) {
232+
ShardFailure any = shardFailures.values().iterator().next();
233+
failures.add(new ShardSearchFailure(any.failure));
220234
}
221-
return collector.getFailures();
235+
return failures;
222236
}
223237

224238
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ PlainActionFuture<ComputeResponse> sendRequests(
453453
DataNodeRequestSender requestSender = new DataNodeRequestSender(
454454
transportService,
455455
executor,
456+
"",
456457
task,
457458
allowPartialResults,
458459
concurrentRequests

0 commit comments

Comments
 (0)