Skip to content

Commit 091ea9a

Browse files
authored
Support partial results in CCS in ES|QL (#122708)
A follow-up to #121942 that adds support for partial results in CCS in ES|QL. Relates #121942
1 parent e74ef2d commit 091ea9a

File tree

11 files changed

+526
-273
lines changed

11 files changed

+526
-273
lines changed

docs/changelog/122708.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122708
2+
summary: Support partial results in CCS in ES|QL
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
6161
public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
6262
public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange";
6363

64-
private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
64+
public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
6565
private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange";
6666

6767
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
import java.util.ArrayList;
3333
import java.util.Collection;
3434
import java.util.HashMap;
35+
import java.util.HashSet;
3536
import java.util.List;
3637
import java.util.Map;
38+
import java.util.Set;
3739
import java.util.concurrent.TimeUnit;
3840
import java.util.concurrent.atomic.AtomicLong;
3941

@@ -48,6 +50,7 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters
4850
protected static final String REMOTE_INDEX = "logs-2";
4951
protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking";
5052
protected static final String INDEX_WITH_FAIL_MAPPING = "failing";
53+
protected static final AtomicLong NEXT_DOC_ID = new AtomicLong(0);
5154

5255
@Override
5356
protected List<String> remoteClusterAlias() {
@@ -150,7 +153,7 @@ protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bo
150153
protected Map<String, Object> setupClusters(int numClusters) throws IOException {
151154
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
152155
int numShardsLocal = randomIntBetween(1, 5);
153-
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
156+
populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10);
154157

155158
int numShardsRemote = randomIntBetween(1, 5);
156159
populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
@@ -180,19 +183,24 @@ protected Map<String, Object> setupClusters(int numClusters) throws IOException
180183
return clusterInfo;
181184
}
182185

183-
protected void populateLocalIndices(String indexName, int numShards) {
184-
Client localClient = client(LOCAL_CLUSTER);
186+
protected Set<String> populateIndex(String clusterAlias, String indexName, int numShards, int numDocs) {
187+
Client client = client(clusterAlias);
185188
assertAcked(
186-
localClient.admin()
189+
client.admin()
187190
.indices()
188191
.prepareCreate(indexName)
189192
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
190193
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long")
191194
);
192-
for (int i = 0; i < 10; i++) {
193-
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
195+
Set<String> ids = new HashSet<>();
196+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
197+
for (int i = 0; i < numDocs; i++) {
198+
String id = Long.toString(NEXT_DOC_ID.incrementAndGet());
199+
client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get();
200+
ids.add(id);
194201
}
195-
localClient.admin().indices().prepareRefresh(indexName).get();
202+
client.admin().indices().prepareRefresh(indexName).get();
203+
return ids;
196204
}
197205

198206
protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ Map<String, String> createEmptyIndicesWithNoMappings(int numClusters) {
948948

949949
Map<String, Object> setupFailClusters() throws IOException {
950950
int numShardsLocal = randomIntBetween(1, 3);
951-
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
951+
populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10);
952952

953953
int numShardsRemote = randomIntBetween(1, 3);
954954
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

+370
Large diffs are not rendered by default.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

+71-42
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionListenerResponseHandler;
1212
import org.elasticsearch.action.OriginalIndices;
1313
import org.elasticsearch.action.support.ChannelActionListener;
14+
import org.elasticsearch.compute.operator.DriverProfile;
1415
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1516
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
1617
import org.elasticsearch.core.Releasable;
@@ -34,9 +35,9 @@
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Map;
37-
import java.util.Objects;
3838
import java.util.Set;
3939
import java.util.concurrent.Executor;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicReference;
4142

4243
/**
@@ -74,69 +75,97 @@ void startComputeOnRemoteCluster(
7475
RemoteCluster cluster,
7576
Runnable cancelQueryOnFailure,
7677
EsqlExecutionInfo executionInfo,
77-
ActionListener<ComputeResponse> listener
78+
ActionListener<List<DriverProfile>> listener
7879
) {
7980
var queryPragmas = configuration.pragmas();
8081
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
8182
final var childSessionId = computeService.newChildSession(sessionId);
82-
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
8383
final String clusterAlias = cluster.clusterAlias();
84-
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
85-
var resp = finalResponse.get();
86-
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
87-
}))) {
88-
ExchangeService.openExchange(
89-
transportService,
90-
cluster.connection,
91-
childSessionId,
92-
queryPragmas.exchangeBufferSize(),
93-
esqlExecutor,
94-
EsqlCCSUtils.skipUnavailableListener(
95-
computeListener.acquireAvoid(),
96-
executionInfo,
97-
clusterAlias,
98-
EsqlExecutionInfo.Cluster.Status.SKIPPED
99-
).delegateFailureAndWrap((l, unused) -> {
100-
var listenerGroup = new RemoteListenerGroup(
101-
transportService,
102-
rootTask,
103-
computeListener,
104-
clusterAlias,
105-
executionInfo,
106-
l
107-
);
108-
109-
var remoteSink = exchangeService.newRemoteSink(
110-
listenerGroup.getGroupTask(),
111-
childSessionId,
112-
transportService,
113-
cluster.connection
114-
);
84+
final AtomicBoolean pagesFetched = new AtomicBoolean();
85+
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
86+
listener = listener.delegateResponse((l, e) -> {
87+
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
88+
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
89+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
90+
l.onResponse(List.of());
91+
} else if (configuration.allowPartialResults()) {
92+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
93+
l.onResponse(List.of());
94+
} else {
95+
l.onFailure(e);
96+
}
97+
});
98+
ExchangeService.openExchange(
99+
transportService,
100+
cluster.connection,
101+
childSessionId,
102+
queryPragmas.exchangeBufferSize(),
103+
esqlExecutor,
104+
listener.delegateFailure((l, unused) -> {
105+
final CancellableTask groupTask;
106+
final Runnable onGroupFailure;
107+
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
108+
if (failFast) {
109+
groupTask = rootTask;
110+
onGroupFailure = cancelQueryOnFailure;
111+
} else {
112+
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
113+
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
114+
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
115+
}
116+
try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
117+
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
118+
return profiles;
119+
}))) {
120+
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
115121
exchangeSource.addRemoteSink(
116122
remoteSink,
117-
executionInfo.isSkipUnavailable(clusterAlias) == false,
118-
() -> {},
123+
failFast,
124+
() -> pagesFetched.set(true),
119125
queryPragmas.concurrentExchangeClients(),
120-
listenerGroup.getExchangeRequestListener()
126+
computeListener.acquireAvoid()
121127
);
122128
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
123129
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
124-
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
130+
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
125131
finalResponse.set(r);
126132
return r.getProfiles();
127133
});
128134
transportService.sendChildRequest(
129135
cluster.connection,
130136
ComputeService.CLUSTER_ACTION_NAME,
131137
clusterRequest,
132-
listenerGroup.getGroupTask(),
138+
groupTask,
133139
TransportRequestOptions.EMPTY,
134140
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
135141
);
136-
})
137-
);
138-
}
142+
}
143+
})
144+
);
145+
}
139146

147+
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
148+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
149+
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(resp.getTotalShards())
150+
.setSuccessfulShards(resp.getSuccessfulShards())
151+
.setSkippedShards(resp.getSkippedShards())
152+
.setFailedShards(resp.getFailedShards());
153+
if (resp.getTook() != null) {
154+
builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()));
155+
} else {
156+
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
157+
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
158+
builder.setTook(executionInfo.tookSoFar());
159+
}
160+
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
161+
if (executionInfo.isStopped() || resp.failedShards > 0) {
162+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
163+
} else {
164+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
165+
}
166+
}
167+
return builder.build();
168+
});
140169
}
141170

142171
List<RemoteCluster> getRemoteClusters(

0 commit comments

Comments
 (0)