diff --git a/docs/changelog/122708.yaml b/docs/changelog/122708.yaml new file mode 100644 index 0000000000000..ec4e75798d473 --- /dev/null +++ b/docs/changelog/122708.yaml @@ -0,0 +1,5 @@ +pr: 122708 +summary: Support partial results in CCS in ES|QL +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index dd36a6f455e8b..eddcbd97153ef 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent { public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange"; public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange"; - private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange"; + public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange"; private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange"; /** diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 510f5945f745a..d31315d7cd0ef 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -32,8 +32,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -48,6 +50,7 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters protected static final String REMOTE_INDEX = "logs-2"; protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking"; protected static final String INDEX_WITH_FAIL_MAPPING = "failing"; + protected static final AtomicLong NEXT_DOC_ID = new AtomicLong(0); @Override protected List remoteClusterAlias() { @@ -150,7 +153,7 @@ protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bo protected Map setupClusters(int numClusters) throws IOException { assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); + populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10); int numShardsRemote = randomIntBetween(1, 5); populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); @@ -180,19 +183,24 @@ protected Map setupClusters(int numClusters) throws IOException return clusterInfo; } - protected void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); + protected Set populateIndex(String clusterAlias, String indexName, int numShards, int numDocs) { + Client client = client(clusterAlias); assertAcked( - localClient.admin() + client.admin() .indices() .prepareCreate(indexName) .setSettings(Settings.builder().put("index.number_of_shards", numShards)) .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + Set ids = new HashSet<>(); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(NEXT_DOC_ID.incrementAndGet()); + client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); + ids.add(id); } - localClient.admin().indices().prepareRefresh(indexName).get(); + client.admin().indices().prepareRefresh(indexName).get(); + return ids; } protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index c1976c9fa2ad8..52d03c483332c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -948,7 +948,7 @@ Map createEmptyIndicesWithNoMappings(int numClusters) { Map setupFailClusters() throws IOException { int numShardsLocal = randomIntBetween(1, 3); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); + populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10); int numShardsRemote = randomIntBetween(1, 3); populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java new file mode 100644 index 0000000000000..ac67c1458dd64 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -0,0 +1,370 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.test.FailingFieldPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.plugin.ComputeService; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase { + + private static class ClusterSetup { + final int okShards = randomIntBetween(1, 5); + final int failingShards = randomIntBetween(1, 5); + Set okIds; + } + + private final ClusterSetup local = new ClusterSetup(); + private final ClusterSetup remote1 = new ClusterSetup(); + private final ClusterSetup remote2 = new ClusterSetup(); + + void populateIndices() throws Exception { + local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards, between(1, 100)); + populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards); + + remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards, between(1, 100)); + populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards); + + remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards, between(1, 100)); + populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards); + } + + public void testPartialResults() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,fail*,*:ok*,*:fail* | KEEP id, fail_me | LIMIT 1000"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); + assertThat(error.getMessage(), containsString("Accessing failing field")); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + Set allIds = Stream.of(local.okIds, remote1.okIds, remote2.okIds) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + List> rows = getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(allIds.size())); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + assertThat(id, is(in(allIds))); + } + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards + local.failingShards)); + assertThat(localInfo.getSuccessfulShards(), lessThanOrEqualTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards + remote1.failingShards)); + assertThat(remote1Info.getSuccessfulShards(), lessThanOrEqualTo(remote1.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Info.getTotalShards(), equalTo(remote2.okShards + remote2.failingShards)); + assertThat(remote2Info.getSuccessfulShards(), lessThanOrEqualTo(remote2.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } + + public void testOneRemoteClusterPartial() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,cluster-a:ok*,*-b:fail* | KEEP id, fail_me"); + request.allowPartialResults(true); + request.includeCCSMetadata(randomBoolean()); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + Set allIds = Stream.of(local.okIds, remote1.okIds).flatMap(Collection::stream).collect(Collectors.toSet()); + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(allIds.size())); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(allIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards)); + assertThat(localInfo.getSuccessfulShards(), equalTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards)); + assertThat(remote1Info.getSuccessfulShards(), equalTo(remote1.okShards)); + assertThat(remote1Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Info.getTotalShards(), equalTo(remote2.failingShards)); + assertThat(remote2Info.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } + + public void testFailToReceiveClusterResponse() throws Exception { + populateIndices(); + Exception simulatedFailure = randomFailure(); + // fetched pages, but failed to receive the cluster response + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + ComputeService.CLUSTER_ACTION_NAME, + (handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + sendResponse(simulatedFailure); + } + + @Override + public void sendResponse(Exception exception) { + channel.sendResponse(exception); + } + }, task) + ); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,cluster-a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds))); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(localInfo.getTotalShards())); + assertThat(localInfo.getSuccessfulShards(), equalTo(localInfo.getSuccessfulShards())); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } finally { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + public void testFailToStartRequestOnRemoteCluster() throws Exception { + populateIndices(); + Exception simulatedFailure = randomFailure(); + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + String actionToFail = randomFrom( + ExchangeService.EXCHANGE_ACTION_NAME, + ExchangeService.OPEN_EXCHANGE_ACTION_NAME, + ComputeService.CLUSTER_ACTION_NAME + ); + ts.addRequestHandlingBehavior(actionToFail, (handler, request, channel, task) -> { channel.sendResponse(simulatedFailure); }); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,*a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + var error = expectThrows(Exception.class, () -> runQuery(request).close()); + EsqlTestUtils.assertEsqlFailure(error); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(local.okIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards)); + assertThat(localInfo.getSuccessfulShards(), equalTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } finally { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + public void testFailSearchShardsOnLocalCluster() throws Exception { + populateIndices(); + Exception simulatedFailure = randomFailure(); + for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + EsqlSearchShardsAction.NAME, + (handler, request, channel, task) -> { channel.sendResponse(simulatedFailure); } + ); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,*a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + var error = expectThrows(Exception.class, () -> runQuery(request).close()); + EsqlTestUtils.assertEsqlFailure(error); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(remote1.okIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + } finally { + for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + private static Exception randomFailure() { + return randomFrom( + new IllegalStateException("driver was closed already"), + new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT), + new IOException("broken disk"), + new ResourceNotFoundException("exchange sink was not found"), + new EsRejectedExecutionException("node is shutting down") + ); + } + + private Set populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException { + Client client = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.startObject("properties"); + { + mapping.startObject("id").field("type", "keyword").endObject(); + mapping.startObject("tag").field("type", "keyword").endObject(); + } + mapping.endObject(); + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + Set ids = new HashSet<>(); + String tag = clusterAlias.isEmpty() ? "local" : clusterAlias; + int numDocs = between(50, 100); // large enough to have failing documents in every shard + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(NEXT_DOC_ID.incrementAndGet()); + client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); + ids.add(id); + } + client.admin().indices().prepareRefresh(indexName).get(); + for (var shardStats : client.admin().indices().prepareStats(indexName).clear().setDocs(true).get().getShards()) { + var docsStats = shardStats.getStats().docs; + assertNotNull(docsStats); + assertThat("no doc for shard " + shardStats.getShardRouting().shardId(), docsStats.getCount(), greaterThan(0L)); + } + return ids; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 3b599a87afaff..36650f5ca2c4e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -34,9 +35,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -74,54 +75,59 @@ void startComputeOnRemoteCluster( RemoteCluster cluster, Runnable cancelQueryOnFailure, EsqlExecutionInfo executionInfo, - ActionListener listener + ActionListener> listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); final var childSessionId = computeService.newChildSession(sessionId); - final AtomicReference finalResponse = new AtomicReference<>(); final String clusterAlias = cluster.clusterAlias(); - try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - var resp = finalResponse.get(); - return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles)); - }))) { - ExchangeService.openExchange( - transportService, - cluster.connection, - childSessionId, - queryPragmas.exchangeBufferSize(), - esqlExecutor, - EsqlCCSUtils.skipUnavailableListener( - computeListener.acquireAvoid(), - executionInfo, - clusterAlias, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ).delegateFailureAndWrap((l, unused) -> { - var listenerGroup = new RemoteListenerGroup( - transportService, - rootTask, - computeListener, - clusterAlias, - executionInfo, - l - ); - - var remoteSink = exchangeService.newRemoteSink( - listenerGroup.getGroupTask(), - childSessionId, - transportService, - cluster.connection - ); + final AtomicBoolean pagesFetched = new AtomicBoolean(); + final AtomicReference finalResponse = new AtomicReference<>(); + listener = listener.delegateResponse((l, e) -> { + final boolean receivedResults = finalResponse.get() != null || pagesFetched.get(); + if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); + l.onResponse(List.of()); + } else if (configuration.allowPartialResults()) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + l.onResponse(List.of()); + } else { + l.onFailure(e); + } + }); + ExchangeService.openExchange( + transportService, + cluster.connection, + childSessionId, + queryPragmas.exchangeBufferSize(), + esqlExecutor, + listener.delegateFailure((l, unused) -> { + final CancellableTask groupTask; + final Runnable onGroupFailure; + boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false; + if (failFast) { + groupTask = rootTask; + onGroupFailure = cancelQueryOnFailure; + } else { + groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]"); + onGroupFailure = computeService.cancelQueryOnFailure(groupTask); + l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); + } + try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> { + updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); + return profiles; + }))) { + var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink( remoteSink, - executionInfo.isSkipUnavailable(clusterAlias) == false, - () -> {}, + failFast, + () -> pagesFetched.set(true), queryPragmas.concurrentExchangeClients(), - listenerGroup.getExchangeRequestListener() + computeListener.acquireAvoid() ); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); - final ActionListener clusterListener = listenerGroup.getClusterRequestListener().map(r -> { + final ActionListener clusterListener = computeListener.acquireCompute().map(r -> { finalResponse.set(r); return r.getProfiles(); }); @@ -129,14 +135,37 @@ void startComputeOnRemoteCluster( cluster.connection, ComputeService.CLUSTER_ACTION_NAME, clusterRequest, - listenerGroup.getGroupTask(), + groupTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); - }) - ); - } + } + }) + ); + } + private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(resp.getTotalShards()) + .setSuccessfulShards(resp.getSuccessfulShards()) + .setSkippedShards(resp.getSkippedShards()) + .setFailedShards(resp.getFailedShards()); + if (resp.getTook() != null) { + builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos())); + } else { + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator + // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response + builder.setTook(executionInfo.tookSoFar()); + } + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { + if (executionInfo.isStopped() || resp.failedShards > 0) { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } + } + return builder.build(); + }); } List getRemoteClusters( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a9a3be7ecab1c..e69de5a20a888 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -31,8 +32,12 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceProvider; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; @@ -57,7 +62,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -210,14 +214,18 @@ public void execute( cancelQueryOnFailure, computeListener.acquireCompute().delegateFailure((l, profiles) -> { if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { - var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); - var status = localClusterWasInterrupted.get() - ? EsqlExecutionInfo.Cluster.Status.PARTIAL - : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - execInfo.swapCluster( - LOCAL_CLUSTER, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status).setTook(tookTime).build() - ); + execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> { + var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime); + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { + final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); + var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) + ? EsqlExecutionInfo.Cluster.Status.PARTIAL + : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; + builder.setStatus(status); + } + return builder.build(); + }); } l.onResponse(profiles); }) @@ -240,6 +248,7 @@ public void execute( ); // starts computes on data nodes on the main cluster if (localConcreteIndices != null && localConcreteIndices.indices().length > 0) { + final var dataNodesListener = localListener.acquireCompute(); dataNodeComputeHandler.startComputeOnDataNodes( sessionId, LOCAL_CLUSTER, @@ -250,7 +259,7 @@ public void execute( localOriginalIndices, exchangeSource, cancelQueryOnFailure, - localListener.acquireCompute().map(r -> { + ActionListener.wrap(r -> { localClusterWasInterrupted.set(execInfo.isStopped()); execInfo.swapCluster( LOCAL_CLUSTER, @@ -260,7 +269,19 @@ public void execute( .setFailedShards(r.getFailedShards()) .build() ); - return r.getProfiles(); + dataNodesListener.onResponse(r.getProfiles()); + }, e -> { + if (configuration.allowPartialResults()) { + execInfo.swapCluster( + LOCAL_CLUSTER, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus( + EsqlExecutionInfo.Cluster.Status.PARTIAL + ).setFailures(List.of(new ShardSearchFailure(e))).build() + ); + dataNodesListener.onResponse(List.of()); + } else { + dataNodesListener.onFailure(e); + } }) ); } @@ -277,48 +298,13 @@ public void execute( cluster, cancelQueryOnFailure, execInfo, - computeListener.acquireCompute().map(r -> { - updateExecutionInfo(execInfo, cluster.clusterAlias(), r); - return r.getProfiles(); - }) + computeListener.acquireCompute() ); } } } } - private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { - Function runningToSuccess = status -> { - if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { - return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - } else { - return status; - } - }; - if (resp.getTook() != null) { - var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(tookTime) - .setTotalShards(resp.getTotalShards()) - .setSuccessfulShards(resp.getSuccessfulShards()) - .setSkippedShards(resp.getSkippedShards()) - .setFailedShards(resp.getFailedShards()) - .build() - ); - } else { - // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator - // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(executionInfo.tookSoFar()) - .build() - ); - } - } - // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) { @@ -444,4 +430,33 @@ Runnable cancelQueryOnFailure(CancellableTask task) { transportService.getTaskManager().cancelTaskAndDescendants(task, "cancelled on failure", false, ActionListener.noop()); }); } + + CancellableTask createGroupTask(Task parentTask, Supplier description) { + final TaskManager taskManager = transportService.getTaskManager(); + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final Supplier parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription.get() + "]"; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 3106a7b5a43cd..e4ba1678722cc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -130,8 +130,7 @@ protected void sendRequest( final Runnable onGroupFailure; final CancellableTask groupTask; if (allowPartialResults) { - groupTask = RemoteListenerGroup.createGroupTask( - transportService, + groupTask = computeService.createGroupTask( parentTask, () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java deleted file mode 100644 index 3c6c13993520b..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.plugin; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; -import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; - -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -/** - * Create group task for this cluster. This group task ensures that two branches of the computation: - * the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. - * runAfter listeners below ensure that the group is finalized when both branches are done. - * The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. - */ -class RemoteListenerGroup { - private final CancellableTask groupTask; - private final ActionListener exchangeRequestListener; - private final ActionListener> clusterRequestListener; - private final TaskManager taskManager; - private final String clusterAlias; - private final EsqlExecutionInfo executionInfo; - private final TransportService transportService; - - RemoteListenerGroup( - TransportService transportService, - Task rootTask, - ComputeListener computeListener, - String clusterAlias, - EsqlExecutionInfo executionInfo, - ActionListener delegate - ) { - this.transportService = transportService; - this.taskManager = transportService.getTaskManager(); - this.clusterAlias = clusterAlias; - this.executionInfo = executionInfo; - groupTask = createGroupTask(transportService, rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); - CountDown countDown = new CountDown(2); - // The group is done when both the sink and the cluster request are done - Runnable finishGroup = () -> { - if (countDown.countDown()) { - taskManager.unregister(groupTask); - delegate.onResponse(null); - } - }; - // Cancel the group on sink failure - exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup); - - // Cancel the group on cluster request failure - clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup); - } - - /** - * Create a listener that: - * 1. Cancels the group task on failure - * 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error - */ - private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { - return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { - EsqlCCSUtils.skipUnavailableListener(inner, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL) - .onFailure(e); - })); - }), finishGroup); - } - - public CancellableTask getGroupTask() { - return groupTask; - } - - public ActionListener getExchangeRequestListener() { - return exchangeRequestListener; - } - - public ActionListener> getClusterRequestListener() { - return clusterRequestListener; - } - - public static CancellableTask createGroupTask(TransportService transportService, Task parentTask, Supplier description) { - final TaskManager taskManager = transportService.getTaskManager(); - return (CancellableTask) taskManager.register( - "transport", - "esql_compute_group", - new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) - ); - } - - private static class ComputeGroupTaskRequest extends TransportRequest { - private final Supplier parentDescription; - - ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { - this.parentDescription = description; - setParentTask(parentTask); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - assert parentTaskId.isSet(); - return new CancellableTask(id, type, action, "", parentTaskId, headers); - } - - @Override - public String getDescription() { - return "group [" + parentDescription.get() + "]"; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 64e5c6647e9ca..89cd4b3d4d7cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -369,23 +369,4 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, return ExceptionsHelper.isRemoteUnavailableException(e); } - - /** - * Wrap a listener so that it will skip errors that are ignorable - */ - public static ActionListener skipUnavailableListener( - ActionListener delegate, - EsqlExecutionInfo executionInfo, - String clusterAlias, - EsqlExecutionInfo.Cluster.Status status - ) { - return delegate.delegateResponse((l, e) -> { - if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); - l.onResponse(null); - } else { - l.onFailure(e); - } - }); - } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 49cfbba5c7610..7df3851224498 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -52,7 +51,6 @@ import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError; -import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.skipUnavailableListener; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -797,35 +795,6 @@ public void testShouldIgnoreRuntimeError() { assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false)); } - public void testSkipUnavailableListener() { - Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); - - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); - executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); - executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); - - ActionListener expectResult = ActionListener.wrap(unused -> {}, (e) -> fail("Listener should not have failed")); - ActionListener expectFailure = ActionListener.wrap(unused -> fail("Listener should have failed"), (e) -> {}); - - // snip_unavailable=true but not connect exception, so should fail - skipUnavailableListener(expectFailure, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new ElasticsearchException("something is wrong") - ); - assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - - // snip_unavailable=true, so should not fail - skipUnavailableListener(expectResult, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new IllegalStateException("Unable to open any connections") - ); - assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - // snip_unavailable=false, so should fail - skipUnavailableListener(expectFailure, executionInfo, REMOTE2_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new IllegalStateException("Unable to open any connections") - ); - - } - private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); }