Skip to content

Commit b88b208

Browse files
committed
Support partial results in CCS in ES|QL
1 parent 4908c92 commit b88b208

File tree

8 files changed

+444
-258
lines changed

8 files changed

+444
-258
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
6161
public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
6262
public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange";
6363

64-
private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
64+
public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
6565
private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange";
6666

6767
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

+322
Large diffs are not rendered by default.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

+81-41
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionListenerResponseHandler;
1212
import org.elasticsearch.action.OriginalIndices;
1313
import org.elasticsearch.action.support.ChannelActionListener;
14+
import org.elasticsearch.compute.operator.DriverProfile;
1415
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1516
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
1617
import org.elasticsearch.core.Releasable;
@@ -34,10 +35,11 @@
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Map;
37-
import java.util.Objects;
3838
import java.util.Set;
3939
import java.util.concurrent.Executor;
40+
import java.util.concurrent.atomic.AtomicInteger;
4041
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.function.Function;
4143

4244
/**
4345
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
@@ -74,69 +76,107 @@ void startComputeOnRemoteCluster(
7476
RemoteCluster cluster,
7577
Runnable cancelQueryOnFailure,
7678
EsqlExecutionInfo executionInfo,
77-
ActionListener<ComputeResponse> listener
79+
ActionListener<List<DriverProfile>> listener
7880
) {
7981
var queryPragmas = configuration.pragmas();
8082
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
8183
final var childSessionId = computeService.newChildSession(sessionId);
82-
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
8384
final String clusterAlias = cluster.clusterAlias();
84-
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
85-
var resp = finalResponse.get();
86-
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
87-
}))) {
88-
ExchangeService.openExchange(
89-
transportService,
90-
cluster.connection,
91-
childSessionId,
92-
queryPragmas.exchangeBufferSize(),
93-
esqlExecutor,
94-
EsqlCCSUtils.skipUnavailableListener(
95-
computeListener.acquireAvoid(),
96-
executionInfo,
97-
clusterAlias,
98-
EsqlExecutionInfo.Cluster.Status.SKIPPED
99-
).delegateFailureAndWrap((l, unused) -> {
100-
var listenerGroup = new RemoteListenerGroup(
101-
transportService,
102-
rootTask,
103-
computeListener,
104-
clusterAlias,
105-
executionInfo,
106-
l
107-
);
108-
109-
var remoteSink = exchangeService.newRemoteSink(
110-
listenerGroup.getGroupTask(),
111-
childSessionId,
112-
transportService,
113-
cluster.connection
114-
);
85+
final AtomicInteger pagesFetched = new AtomicInteger();
86+
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
87+
listener = listener.delegateResponse((l, e) -> {
88+
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0;
89+
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
90+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
91+
l.onResponse(List.of());
92+
} else if (configuration.allowPartialResults()) {
93+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
94+
l.onResponse(List.of());
95+
} else {
96+
l.onFailure(e);
97+
}
98+
});
99+
ExchangeService.openExchange(
100+
transportService,
101+
cluster.connection,
102+
childSessionId,
103+
queryPragmas.exchangeBufferSize(),
104+
esqlExecutor,
105+
listener.delegateFailure((l, unused) -> {
106+
final CancellableTask groupTask;
107+
final Runnable onGroupFailure;
108+
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
109+
if (failFast) {
110+
groupTask = rootTask;
111+
onGroupFailure = cancelQueryOnFailure;
112+
} else {
113+
groupTask = computeService.createGroupTask(rootTask);
114+
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
115+
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
116+
}
117+
try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
118+
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
119+
return profiles;
120+
}))) {
121+
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
115122
exchangeSource.addRemoteSink(
116123
remoteSink,
117-
executionInfo.isSkipUnavailable(clusterAlias) == false,
118-
() -> {},
124+
failFast,
125+
pagesFetched::incrementAndGet,
119126
queryPragmas.concurrentExchangeClients(),
120-
listenerGroup.getExchangeRequestListener()
127+
computeListener.acquireAvoid()
121128
);
122129
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
123130
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
124-
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
131+
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
125132
finalResponse.set(r);
126133
return r.getProfiles();
127134
});
128135
transportService.sendChildRequest(
129136
cluster.connection,
130137
ComputeService.CLUSTER_ACTION_NAME,
131138
clusterRequest,
132-
listenerGroup.getGroupTask(),
139+
groupTask,
133140
TransportRequestOptions.EMPTY,
134141
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
135142
);
136-
})
143+
}
144+
})
145+
);
146+
}
147+
148+
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
149+
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
150+
if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) {
151+
return status;
152+
} else if (executionInfo.isStopped() || resp.failedShards > 0) {
153+
return EsqlExecutionInfo.Cluster.Status.PARTIAL;
154+
} else {
155+
return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
156+
}
157+
};
158+
if (resp.getTook() != null) {
159+
var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos());
160+
executionInfo.swapCluster(
161+
clusterAlias,
162+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
163+
.setTook(tookTime)
164+
.setTotalShards(resp.getTotalShards())
165+
.setSuccessfulShards(resp.getSuccessfulShards())
166+
.setSkippedShards(resp.getSkippedShards())
167+
.setFailedShards(resp.getFailedShards())
168+
.build()
169+
);
170+
} else {
171+
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
172+
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
173+
executionInfo.swapCluster(
174+
clusterAlias,
175+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
176+
.setTook(executionInfo.tookSoFar())
177+
.build()
137178
);
138179
}
139-
140180
}
141181

142182
List<RemoteCluster> getRemoteClusters(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

+39-38
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@
3131
import org.elasticsearch.search.internal.SearchContext;
3232
import org.elasticsearch.search.lookup.SourceProvider;
3333
import org.elasticsearch.tasks.CancellableTask;
34+
import org.elasticsearch.tasks.Task;
35+
import org.elasticsearch.tasks.TaskId;
36+
import org.elasticsearch.tasks.TaskManager;
3437
import org.elasticsearch.threadpool.ThreadPool;
3538
import org.elasticsearch.transport.RemoteClusterAware;
39+
import org.elasticsearch.transport.TransportRequest;
3640
import org.elasticsearch.transport.TransportService;
3741
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3842
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
@@ -57,7 +61,6 @@
5761
import java.util.Set;
5862
import java.util.concurrent.atomic.AtomicBoolean;
5963
import java.util.concurrent.atomic.AtomicLong;
60-
import java.util.function.Function;
6164
import java.util.function.Supplier;
6265

6366
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
@@ -211,7 +214,8 @@ public void execute(
211214
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
212215
if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
213216
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
214-
var status = localClusterWasInterrupted.get()
217+
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
218+
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
215219
? EsqlExecutionInfo.Cluster.Status.PARTIAL
216220
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
217221
execInfo.swapCluster(
@@ -277,48 +281,13 @@ public void execute(
277281
cluster,
278282
cancelQueryOnFailure,
279283
execInfo,
280-
computeListener.acquireCompute().map(r -> {
281-
updateExecutionInfo(execInfo, cluster.clusterAlias(), r);
282-
return r.getProfiles();
283-
})
284+
computeListener.acquireCompute()
284285
);
285286
}
286287
}
287288
}
288289
}
289290

290-
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
291-
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
292-
if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
293-
return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
294-
} else {
295-
return status;
296-
}
297-
};
298-
if (resp.getTook() != null) {
299-
var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos());
300-
executionInfo.swapCluster(
301-
clusterAlias,
302-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
303-
.setTook(tookTime)
304-
.setTotalShards(resp.getTotalShards())
305-
.setSuccessfulShards(resp.getSuccessfulShards())
306-
.setSkippedShards(resp.getSkippedShards())
307-
.setFailedShards(resp.getFailedShards())
308-
.build()
309-
);
310-
} else {
311-
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
312-
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
313-
executionInfo.swapCluster(
314-
clusterAlias,
315-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
316-
.setTook(executionInfo.tookSoFar())
317-
.build()
318-
);
319-
}
320-
}
321-
322291
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
323292
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
324293
if (execInfo.isCrossClusterSearch()) {
@@ -444,4 +413,36 @@ Runnable cancelQueryOnFailure(CancellableTask task) {
444413
transportService.getTaskManager().cancelTaskAndDescendants(task, "cancelled on failure", false, ActionListener.noop());
445414
});
446415
}
416+
417+
CancellableTask createGroupTask(Task parentTask) {
418+
final TaskManager taskManager = transportService.getTaskManager();
419+
return (CancellableTask) taskManager.register(
420+
"transport",
421+
"esql_compute_group",
422+
new ComputeGroupTaskRequest(
423+
parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(),
424+
parentTask::getDescription
425+
)
426+
);
427+
}
428+
429+
private static class ComputeGroupTaskRequest extends TransportRequest {
430+
private final Supplier<String> parentDescription;
431+
432+
ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) {
433+
this.parentDescription = description;
434+
setParentTask(parentTask);
435+
}
436+
437+
@Override
438+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
439+
assert parentTaskId.isSet();
440+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
441+
}
442+
443+
@Override
444+
public String getDescription() {
445+
return "group [" + parentDescription.get() + "]";
446+
}
447+
}
447448
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,7 @@ protected void sendRequest(
130130
final Runnable onGroupFailure;
131131
final CancellableTask groupTask;
132132
if (allowPartialResults) {
133-
groupTask = RemoteListenerGroup.createGroupTask(
134-
transportService,
135-
parentTask,
136-
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
137-
);
133+
groupTask = computeService.createGroupTask(parentTask);
138134
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
139135
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
140136
} else {

0 commit comments

Comments
 (0)