Skip to content

Commit 17ac046

Browse files
authored
Refactor remote cluster handling in Analyzer (#126426) (#126491)
* Refactor remote cluster handling in Analyzer - Initialize clusters earlier - Simplify cluster set calculation - No need to keep separate skipped list for enrich resolution (cherry picked from commit b21e325) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
1 parent c00ec1e commit 17ac046

File tree

4 files changed

+45
-72
lines changed

4 files changed

+45
-72
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public final class EnrichResolution {
2323

2424
private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
2525
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
26-
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
2726

2827
public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
2928
return resolvedPolicies.get(new Key(policyName, mode));
@@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5251
errors.putIfAbsent(new Key(policyName, mode), reason);
5352
}
5453

55-
public void addUnavailableCluster(String clusterAlias, Exception e) {
56-
unavailableClusters.put(clusterAlias, e);
57-
}
58-
59-
public Map<String, Exception> getUnavailableClusters() {
60-
return unavailableClusters;
61-
}
62-
6354
private record Key(String policyName, Enrich.Mode mode) {
6455

6556
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

+17-15
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
13-
import org.elasticsearch.action.search.SearchRequest;
1413
import org.elasticsearch.action.support.ChannelActionListener;
1514
import org.elasticsearch.action.support.ContextPreservingActionListener;
1615
import org.elasticsearch.action.support.RefCountingListener;
@@ -37,6 +36,7 @@
3736
import org.elasticsearch.xpack.core.ClientHelper;
3837
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
3938
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
39+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4040
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4141
import org.elasticsearch.xpack.esql.core.type.DataType;
4242
import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -49,7 +49,6 @@
4949

5050
import java.io.IOException;
5151
import java.util.ArrayList;
52-
import java.util.Arrays;
5352
import java.util.Collection;
5453
import java.util.Collections;
5554
import java.util.HashMap;
@@ -59,6 +58,8 @@
5958
import java.util.Set;
6059
import java.util.stream.Collectors;
6160

61+
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
62+
6263
/**
6364
* Resolves enrich policies across clusters in several steps:
6465
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
@@ -98,21 +99,22 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
9899
/**
99100
* Resolves a set of enrich policies
100101
*
101-
* @param targetClusters the target clusters
102102
* @param unresolvedPolicies the unresolved policies
103+
* @param executionInfo the execution info
103104
* @param listener notified with the enrich resolution
104105
*/
105106
public void resolvePolicies(
106-
Collection<String> targetClusters,
107107
Collection<UnresolvedPolicy> unresolvedPolicies,
108+
EsqlExecutionInfo executionInfo,
108109
ActionListener<EnrichResolution> listener
109110
) {
110-
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
111+
if (unresolvedPolicies.isEmpty()) {
111112
listener.onResponse(new EnrichResolution());
112113
return;
113114
}
114-
final Set<String> remoteClusters = new HashSet<>(targetClusters);
115-
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
115+
116+
final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
117+
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
116118
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
117119
final EnrichResolution enrichResolution = new EnrichResolution();
118120

@@ -121,7 +123,14 @@ public void resolvePolicies(
121123
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
122124
String clusterAlias = entry.getKey();
123125
if (entry.getValue().connectionError != null) {
124-
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
126+
assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
127+
: "Should never have a connection error for the local cluster";
128+
markClusterWithFinalStateAndNoShards(
129+
executionInfo,
130+
clusterAlias,
131+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
132+
entry.getValue().connectionError
133+
);
125134
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
126135
remoteClusters.remove(clusterAlias);
127136
} else {
@@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
445454
listener
446455
);
447456
}
448-
449-
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
450-
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
451-
.entrySet()
452-
.stream()
453-
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
454-
}
455457
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

+22-47
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
12-
import org.elasticsearch.action.search.ShardSearchFailure;
1312
import org.elasticsearch.action.support.IndicesOptions;
1413
import org.elasticsearch.action.support.SubscribableListener;
1514
import org.elasticsearch.common.Strings;
@@ -19,7 +18,6 @@
1918
import org.elasticsearch.compute.data.Page;
2019
import org.elasticsearch.compute.operator.DriverProfile;
2120
import org.elasticsearch.core.Releasables;
22-
import org.elasticsearch.core.TimeValue;
2321
import org.elasticsearch.index.IndexMode;
2422
import org.elasticsearch.index.query.QueryBuilder;
2523
import org.elasticsearch.indices.IndicesExpressionGrouper;
@@ -78,7 +76,6 @@
7876
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
7977

8078
import java.util.ArrayList;
81-
import java.util.Arrays;
8279
import java.util.HashMap;
8380
import java.util.Iterator;
8481
import java.util.List;
@@ -319,16 +316,10 @@ public void analyzedPlan(
319316
final List<TableInfo> indices = preAnalysis.indices;
320317

321318
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
322-
323-
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
324-
configuredClusters,
325-
indices.stream()
326-
.flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern())))
327-
.toArray(String[]::new)
328-
).keySet();
319+
initializeClusterData(indices, executionInfo);
329320

330321
var listener = SubscribableListener.<EnrichResolution>newForked(
331-
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
322+
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
332323
).<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l));
333324
// first resolve the lookup indices, then the main indices
334325
for (TableInfo lookupIndex : preAnalysis.lookupIndices) {
@@ -352,12 +343,6 @@ public void analyzedPlan(
352343
}).<PreAnalysisResult>andThen((l, result) -> {
353344
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
354345

355-
// "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
356-
// resolving one more time (the first attempt failed and the query had a filter)
357-
for (String clusterAlias : executionInfo.clusterAliases()) {
358-
executionInfo.swapCluster(clusterAlias, (k, v) -> null);
359-
}
360-
361346
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
362347
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l);
363348
}).<LogicalPlan>andThen((l, result) -> {
@@ -388,6 +373,26 @@ private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result
388373
// TODO: Verify that the resolved index actually has indexMode: "lookup"
389374
}
390375

376+
private void initializeClusterData(List<TableInfo> indices, EsqlExecutionInfo executionInfo) {
377+
if (indices.isEmpty()) {
378+
return;
379+
}
380+
assert indices.size() == 1 : "Only single index pattern is supported";
381+
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
382+
configuredClusters,
383+
IndicesOptions.DEFAULT,
384+
indices.get(0).id().indexPattern()
385+
);
386+
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
387+
final String clusterAlias = entry.getKey();
388+
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
389+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
390+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
391+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
392+
});
393+
}
394+
}
395+
391396
private void preAnalyzeIndices(
392397
List<TableInfo> indices,
393398
EsqlExecutionInfo executionInfo,
@@ -400,39 +405,9 @@ private void preAnalyzeIndices(
400405
// Note: JOINs are not supported but we detect them when
401406
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
402407
} else if (indices.size() == 1) {
403-
// known to be unavailable from the enrich policy API call
404-
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
405408
TableInfo tableInfo = indices.get(0);
406409
IndexPattern table = tableInfo.id();
407410

408-
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
409-
configuredClusters,
410-
IndicesOptions.DEFAULT,
411-
table.indexPattern()
412-
);
413-
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
414-
final String clusterAlias = entry.getKey();
415-
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
416-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
417-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
418-
if (unavailableClusters.containsKey(k)) {
419-
return new EsqlExecutionInfo.Cluster(
420-
clusterAlias,
421-
indexExpr,
422-
executionInfo.isSkipUnavailable(clusterAlias),
423-
EsqlExecutionInfo.Cluster.Status.SKIPPED,
424-
0,
425-
0,
426-
0,
427-
0,
428-
List.of(new ShardSearchFailure(unavailableClusters.get(k))),
429-
new TimeValue(0)
430-
);
431-
} else {
432-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
433-
}
434-
});
435-
}
436411
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
437412
// based only on available clusters (which could now be an empty list)
438413
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.transport.TransportService;
3939
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
4040
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
41+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4142
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4243
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
4344
import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -428,6 +429,10 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
428429

429430
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
430431
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
432+
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
433+
for (String cluster : clusters) {
434+
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
435+
}
431436
if (randomBoolean()) {
432437
unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
433438
for (Enrich.Mode mode : Enrich.Mode.values()) {
@@ -441,7 +446,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
441446
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
442447
}
443448
}
444-
super.resolvePolicies(clusters, unresolvedPolicies, future);
449+
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
445450
return future.actionGet(30, TimeUnit.SECONDS);
446451
}
447452

0 commit comments

Comments
 (0)