From 58d3cfd4ed7c745208e7b3c60f14ef31073f356a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 23 Apr 2025 09:09:42 -0600 Subject: [PATCH] Improve filter handling for ESQL CCS (#126807) * Test for CCS with filters * Partial fix for CCS/filters problems (cherry picked from commit 12451b680d464b1d030b31b35b1143be71fb9edc) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java --- .../CrossClusterQueryWithFiltersIT.java | 534 ++++++++++++++++++ .../xpack/esql/action/ColumnInfoImpl.java | 4 + .../xpack/esql/plugin/ComputeService.java | 4 + .../xpack/esql/session/EsqlCCSUtils.java | 31 +- .../xpack/esql/session/EsqlSession.java | 20 +- 5 files changed, 581 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java new file mode 100644 index 0000000000000..2fe25eae7bc94 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -0,0 +1,534 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster.Status; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.core.TimeValue.timeValueSeconds; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase { + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, false); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + protected void assertClusterMetadata(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression, Status status) { + assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getStatus(), equalTo(status)); + assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + + protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); + assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + } + + protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); + assertThat(clusterMetatata.getTotalShards(), equalTo(0)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + } + + protected void assertClusterMetadataSkippedShards( + EsqlExecutionInfo.Cluster clusterMetatata, + int shards, + long took, + String indexExpression + ) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); + assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(shards)); + } + + protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SKIPPED); + assertThat(clusterMetatata.getTotalShards(), equalTo(0)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { + EsqlQueryRequest request = randomBoolean() ? EsqlQueryRequest.asyncEsqlQueryRequest() : EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + if (filter != null) { + request.filter(filter); + } + request.waitForCompletionTimeout(timeValueSeconds(30)); + return runQuery(request); + } + + public void testTimestampFilterFromQuery() { + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + // Both indices are included + var filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1 + docsTest2)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + + // Only local is included + filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + } + + // Only remote is included + filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("2024-01-01"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest2)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-1"); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + } + + // Only local is included - wildcards + filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1)); + // FIXME: this is currently inconsistent with the non-wildcard case, since empty wildcard is not an error, + // the second field-caps does not happen and the remote fields are not added to the response. + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataNoShards(remoteCluster, overallTookMillis, "logs-*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*"); + } + + // Both indices are filtered out + filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,c*:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // Remote has no shards due to filter + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // Local cluster can not be filtered out for now + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-1"); + } + + // Both indices are filtered out - wildcards + filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // Remote has no shards due to filter + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // Local cluster can not be filtered out for now + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-*"); + } + + } + + public void testFilterWithMissingIndex() { + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + int count = 0; + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + count++; + // Local index missing + VerificationException e = expectThrows( + VerificationException.class, + () -> runQuery("from missing", randomBoolean(), filter).close() + ); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + wildcards + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + existing index + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + existing remote + e = expectThrows(VerificationException.class, () -> runQuery("from missing,cluster-a:logs-2", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Wildcard index missing + e = expectThrows(VerificationException.class, () -> runQuery("from missing*", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing*]")); + // Wildcard index missing + existing index + try (EsqlQueryResponse resp = runQuery("from missing*,logs-1", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); + } + } + } + + public void testFilterWithMissingRemoteIndex() { + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + int count = 0; + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + count++; + // Local index missing + VerificationException e = expectThrows( + VerificationException.class, + () -> runQuery("from cluster-a:missing", randomBoolean(), filter).close() + ); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index missing + wildcards + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs*", randomBoolean(), + // filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index missing + existing index + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs-2", randomBoolean(), + // filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index + missing remote + e = expectThrows(VerificationException.class, () -> runQuery("from logs-1,cluster-a:missing", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Wildcard index missing + e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing*", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing*]")); + // Wildcard index missing + existing remote index + try (EsqlQueryResponse resp = runQuery("from cluster-a:missing*,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest2)); + } + // Wildcard index missing + existing local index + try (EsqlQueryResponse resp = runQuery("from cluster-a:missing*,logs-1", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 2 ? 0 : docsTest1)); + } + } + } + + private void checkRemoteFailures() { + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + // One index + var e = expectThrows(ElasticsearchException.class, () -> runQuery("from cluster-a:log-2", randomBoolean(), filter).close()); + // Two indices + e = expectThrows(ElasticsearchException.class, () -> runQuery("from logs-1,cluster-a:log-2", randomBoolean(), filter).close()); + // Wildcard + e = expectThrows(ElasticsearchException.class, () -> runQuery("from logs-1,cluster-a:log*", randomBoolean(), filter).close()); + } + } + + private void checkRemoteWithSkipUnavailable() { + int count = 0; + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + count++; + // One index + try (EsqlQueryResponse resp = runQuery("from cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs-2"); + } + // Two indices + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + if (count > 1) { + assertClusterMetadataNoShards(localCluster, overallTookMillis, "logs-1"); + } else { + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + } + // Wildcard + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + if (count > 1) { + assertClusterMetadataNoShards(localCluster, overallTookMillis, "logs-1"); + } else { + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + } + } + } + + public void testFilterWithUnavailableRemote() throws IOException { + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + cluster(REMOTE_CLUSTER_1).close(); + checkRemoteFailures(); + } + + private void makeRemoteFailFieldCaps() { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + EsqlResolveFieldsAction.NAME, + (handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + sendResponse(new NoSuchRemoteClusterException("cluster [cluster-a] not found, skipping")); + } + + @Override + public void sendResponse(Exception exception) { + channel.sendResponse(exception); + } + }, task) + ); + } + } + + private void clearRemoteRules() { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + + // Test when the disconnect happens on the field-caps call itself + public void testFilterWithUnavailableOnFieldcaps() throws IOException { + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + makeRemoteFailFieldCaps(); + try { + checkRemoteFailures(); + } finally { + clearRemoteRules(); + } + } + + public void testFilterWithUnavailableRemoteAndSkipUnavailable() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + cluster(REMOTE_CLUSTER_1).close(); + checkRemoteWithSkipUnavailable(); + } + + public void testFilterWithUnavailableFieldCapsAndSkipUnavailable() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + makeRemoteFailFieldCaps(); + try { + checkRemoteWithSkipUnavailable(); + } finally { + clearRemoteRules(); + } + } + + protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { + Client client = client(clusterAlias); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping( + "id", + "type=keyword", + "tag-" + tag, + "type=keyword", + "v", + "type=long", + "const", + "type=long", + "@timestamp", + "type=date" + ) + ); + Set ids = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(i); + client.prepareIndex(indexName).setSource("id", id, "tag-" + tag, tag, "v", i, "@timestamp", date).get(); + } + client.admin().indices().prepareRefresh(indexName).get(); + } + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java index 94da383b40957..fd8f7a91a1e05 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java @@ -102,4 +102,8 @@ public String outputType() { public DataType type() { return type; } + + public String toString() { + return "ColumnInfoImpl{" + "name='" + name + '\'' + ", type=" + type + '}'; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 80b5f6c460ca5..7a7526302a587 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -292,6 +292,10 @@ public void execute( // starts computes on remote clusters final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices); for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) { + if (execInfo.getCluster(cluster.clusterAlias()).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) { + // if the cluster is already in the terminal state from the planning stage, no need to call it + continue; + } clusterComputeHandler.startComputeOnRemoteCluster( sessionId, rootTask, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 3c50a75bfafb5..09a565b721ec8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; @@ -145,7 +146,8 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu StringBuilder sb = new StringBuilder(); for (String clusterAlias : executionInfo.clusterAliases()) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + // Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out. + if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) { if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); } else { @@ -180,7 +182,11 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf } } - static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + static void updateExecutionInfoWithClustersWithNoMatchingIndices( + EsqlExecutionInfo executionInfo, + IndexResolution indexResolution, + QueryBuilder filter + ) { Set clustersWithResolvedIndices = new HashSet<>(); // determine missing clusters for (String indexName : indexResolution.resolvedIndices()) { @@ -202,8 +208,8 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - // if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite + if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) { + // if cluster was already in a terminal state, we don't need to check it again continue; } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); @@ -217,9 +223,17 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } else { fatalErrorMessage += "; " + error; } + if (filter == null) { + // Not very useful since we don't send metadata on errors now, but may be useful in the future + // We check for filter since the filter may be the reason why the index is missing, and then it's ok + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FAILED, new VerificationException(error)); + } } else { - // no matching indices and no concrete index requested - just mark it as done, no error - markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + if (indexResolution.isValid()) { + // no matching indices and no concrete index requested - just mark it as done, no error + // We check for the valid resolution because if we have empty resolution it's still an error. + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + } } } if (fatalErrorMessage != null) { @@ -227,6 +241,11 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } + // Filter-less version, mainly for testing where we don't need filter support + static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, null); + } + // visible for testing static boolean concreteIndexRequested(String indexExpression) { if (Strings.isNullOrBlank(indexExpression)) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 5c0754690fa01..ae7af4ff029cb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -340,15 +340,13 @@ public void analyzedPlan( }).andThen((l, result) -> { // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for // invalid index resolution to updateExecutionInfo - if (result.indices.isValid()) { - // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; - } + // If we run out of clusters to search due to unavailability we can stop the analysis right here + if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step l.onResponse(result); }).andThen((l, result) -> { // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l); + analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l); }).andThen((l, result) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; @@ -359,6 +357,10 @@ public void analyzedPlan( LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { + // the order here is tricky - if the cluster has been filtered and later became unavailable, + // do we want to declare it successful or skipped? For now, unavailability takes precedence. + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -461,11 +463,11 @@ private boolean allCCSClustersSkipped( ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception // to let the LogicalPlanActionListener decide how to proceed + LOGGER.debug("No more clusters to search, ending analysis stage"); logicalPlanListener.onFailure(new NoClustersToSearchException()); return true; } @@ -477,6 +479,7 @@ private static void analyzeAndMaybeRetry( Function analyzeAction, QueryBuilder requestFilter, PreAnalysisResult result, + EsqlExecutionInfo executionInfo, ActionListener logicalPlanListener, ActionListener l ) { @@ -486,6 +489,11 @@ private static void analyzeAndMaybeRetry( LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { + if (result.indices.isValid() || requestFilter != null) { + // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report + // when the resolution result is not valid for a different reason. + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter); + } plan = analyzeAction.apply(result); } catch (Exception e) { if (e instanceof VerificationException ve) {