Skip to content

[8.x] Refactor remote cluster handling in Analyzer (#126426) #126491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public final class EnrichResolution {

private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();

public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
return resolvedPolicies.get(new Key(policyName, mode));
Expand Down Expand Up @@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
errors.putIfAbsent(new Key(policyName, mode), reason);
}

public void addUnavailableCluster(String clusterAlias, Exception e) {
unavailableClusters.put(clusterAlias, e);
}

public Map<String, Exception> getUnavailableClusters() {
return unavailableClusters;
}

private record Key(String policyName, Enrich.Mode mode) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.RefCountingListener;
Expand All @@ -37,6 +36,7 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
Expand All @@ -49,7 +49,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -59,6 +58,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;

/**
* Resolves enrich policies across clusters in several steps:
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
Expand Down Expand Up @@ -98,21 +99,22 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
/**
* Resolves a set of enrich policies
*
* @param targetClusters the target clusters
* @param unresolvedPolicies the unresolved policies
* @param executionInfo the execution info
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(
Collection<String> targetClusters,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
ActionListener<EnrichResolution> listener
) {
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
if (unresolvedPolicies.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}
final Set<String> remoteClusters = new HashSet<>(targetClusters);
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();

Expand All @@ -121,7 +123,14 @@ public void resolvePolicies(
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
String clusterAlias = entry.getKey();
if (entry.getValue().connectionError != null) {
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
: "Should never have a connection error for the local cluster";
markClusterWithFinalStateAndNoShards(
executionInfo,
clusterAlias,
EsqlExecutionInfo.Cluster.Status.SKIPPED,
entry.getValue().connectionError
);
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
remoteClusters.remove(clusterAlias);
} else {
Expand Down Expand Up @@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
listener
);
}

public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
Expand All @@ -19,7 +18,6 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
Expand Down Expand Up @@ -78,7 +76,6 @@
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -319,16 +316,10 @@ public void analyzedPlan(
final List<TableInfo> indices = preAnalysis.indices;

EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());

final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
configuredClusters,
indices.stream()
.flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern())))
.toArray(String[]::new)
).keySet();
initializeClusterData(indices, executionInfo);

var listener = SubscribableListener.<EnrichResolution>newForked(
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
).<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l));
// first resolve the lookup indices, then the main indices
for (TableInfo lookupIndex : preAnalysis.lookupIndices) {
Expand All @@ -352,12 +343,6 @@ public void analyzedPlan(
}).<PreAnalysisResult>andThen((l, result) -> {
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";

// "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
// resolving one more time (the first attempt failed and the query had a filter)
for (String clusterAlias : executionInfo.clusterAliases()) {
executionInfo.swapCluster(clusterAlias, (k, v) -> null);
}

// here the requestFilter is set to null, performing the pre-analysis after the first step failed
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l);
}).<LogicalPlan>andThen((l, result) -> {
Expand Down Expand Up @@ -388,6 +373,26 @@ private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result
// TODO: Verify that the resolved index actually has indexMode: "lookup"
}

private void initializeClusterData(List<TableInfo> indices, EsqlExecutionInfo executionInfo) {
if (indices.isEmpty()) {
return;
}
assert indices.size() == 1 : "Only single index pattern is supported";
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
configuredClusters,
IndicesOptions.DEFAULT,
indices.get(0).id().indexPattern()
);
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
final String clusterAlias = entry.getKey();
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
});
}
}

private void preAnalyzeIndices(
List<TableInfo> indices,
EsqlExecutionInfo executionInfo,
Expand All @@ -400,39 +405,9 @@ private void preAnalyzeIndices(
// Note: JOINs are not supported but we detect them when
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
} else if (indices.size() == 1) {
// known to be unavailable from the enrich policy API call
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
TableInfo tableInfo = indices.get(0);
IndexPattern table = tableInfo.id();

Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
configuredClusters,
IndicesOptions.DEFAULT,
table.indexPattern()
);
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
final String clusterAlias = entry.getKey();
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
if (unavailableClusters.containsKey(k)) {
return new EsqlExecutionInfo.Cluster(
clusterAlias,
indexExpr,
executionInfo.isSkipUnavailable(clusterAlias),
EsqlExecutionInfo.Cluster.Status.SKIPPED,
0,
0,
0,
0,
List.of(new ShardSearchFailure(unavailableClusters.get(k))),
new TimeValue(0)
);
} else {
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
}
});
}
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.session.IndexResolver;
Expand Down Expand Up @@ -428,6 +429,10 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {

EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
for (String cluster : clusters) {
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
}
if (randomBoolean()) {
unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
for (Enrich.Mode mode : Enrich.Mode.values()) {
Expand All @@ -441,7 +446,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
}
}
super.resolvePolicies(clusters, unresolvedPolicies, future);
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
return future.actionGet(30, TimeUnit.SECONDS);
}

Expand Down