Skip to content

Run coordinating can_match in field-caps #127734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127734
summary: Run coordinating `can_match` in field-caps
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.fieldcaps;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class FieldCapsWithFilterIT extends ESIntegTestCase {
@Override
protected boolean addMockInternalEngine() {
return false;
}

private static class EngineWithExposingTimestamp extends InternalEngine {
EngineWithExposingTimestamp(EngineConfig engineConfig) {
super(engineConfig);
assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index";
}

@Override
public ShardLongFieldRange getRawFieldRange(String field) {
try (Searcher searcher = acquireSearcher("test")) {
final DirectoryReader directoryReader = searcher.getDirectoryReader();

final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field);
final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field);
if (minPackedValue == null || maxPackedValue == null) {
assert minPackedValue == null && maxPackedValue == null
: Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue);
return ShardLongFieldRange.EMPTY;
}

return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
return Optional.of(EngineWithExposingTimestamp::new);
} else {
return Optional.of(new InternalEngineFactory());
}
}
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class);
}

void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate(index)
.setSettings(indexSettings)
.setMapping("@timestamp", "type=date", "position", "type=long")
);
int numDocs = between(100, 500);
for (int i = 0; i < numDocs; i++) {
client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get();
}
if (exposeTimestamp) {
client.admin().indices().prepareClose(index).get();
client.admin()
.indices()
.prepareUpdateSettings(index)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
.get();
client.admin().indices().prepareOpen(index).get();
assertBusy(() -> {
IndexLongFieldRange timestampRange = clusterService().state().metadata().getProject().index(index).getTimestampRange();
assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges());
});
} else {
client.admin().indices().prepareRefresh(index).get();
}
}

public void testSkipUnmatchedShards() throws Exception {
long oldTimestamp = randomLongBetween(10_000_000, 20_000_000);
long newTimestamp = randomLongBetween(30_000_000, 50_000_000);
String redNode = internalCluster().startDataOnlyNode();
String blueNode = internalCluster().startDataOnlyNode();
createIndexAndIndexDocs(
"index_old",
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode),
oldTimestamp,
true
);
internalCluster().stopNode(redNode);
createIndexAndIndexDocs(
"index_new",
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode),
newTimestamp,
false
);
// fails without index filter
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.setMergeResults(false);
if (randomBoolean()) {
request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp));
}
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), hasSize(1));
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
assertThat(response.getFailures(), hasSize(1));
assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" }));
assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class));
}
// skip unavailable shards with index filter
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp));
request.setMergeResults(false);
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), hasSize(1));
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
assertThat(response.getFailures(), empty());
}
// skip both indices on the coordinator, one the data nodes
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L));
request.setMergeResults(false);
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), empty());
assertThat(response.getFailures(), empty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,10 @@ static void unblockOnRewrite() {

@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
// skip rewriting on the coordinator
if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) {
return this;
}
try {
blockingLatch.await();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ static TransportVersion def(int id) {
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
public static final String NAME = "field_caps_request";
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();

private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;

private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
private String[] fields = Strings.EMPTY_ARRAY;
Expand Down Expand Up @@ -67,6 +70,11 @@ public FieldCapabilitiesRequest(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
includeEmptyFields = in.readBoolean();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
clusterAlias = in.readOptionalString();
} else {
clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
}
}

public FieldCapabilitiesRequest() {}
Expand All @@ -90,6 +98,14 @@ public void setMergeResults(boolean mergeResults) {
this.mergeResults = mergeResults;
}

void clusterAlias(String clusterAlias) {
this.clusterAlias = clusterAlias;
}

String clusterAlias() {
return clusterAlias;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -108,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
out.writeBoolean(includeEmptyFields);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
out.writeOptionalString(clusterAlias);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -72,6 +78,7 @@ final class RequestDispatcher {
ClusterService clusterService,
TransportService transportService,
ProjectResolver projectResolver,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
Task parentTask,
FieldCapabilitiesRequest fieldCapsRequest,
OriginalIndices originalIndices,
Expand Down Expand Up @@ -105,8 +112,14 @@ final class RequestDispatcher {
onIndexFailure.accept(index, e);
continue;
}
final IndexSelector indexResult = new IndexSelector(shardIts);
if (indexResult.nodeToShards.isEmpty()) {
final IndexSelector indexResult = new IndexSelector(
fieldCapsRequest.clusterAlias(),
shardIts,
fieldCapsRequest.indexFilter(),
nowInMillis,
coordinatorRewriteContextProvider
);
if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) {
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
} else {
this.indexSelectors.put(index, indexResult);
Expand Down Expand Up @@ -255,10 +268,34 @@ private static class IndexSelector {
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
private final Map<ShardId, Exception> failures = new HashMap<>();

IndexSelector(List<ShardIterator> shardIts) {
IndexSelector(
String clusterAlias,
List<ShardIterator> shardIts,
QueryBuilder indexFilter,
long nowInMillis,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
) {
for (ShardIterator shardIt : shardIts) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
boolean canMatch = true;
final ShardId shardId = shardIt.shardId();
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use CanMatchPreFilterSearchPhase to avoid duplicates, but this approach is simpler.

var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
if (coordinatorRewriteContext != null) {
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
try {
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
} catch (Exception e) {
// treat as if shard is still a potential match
}
}
}
if (canMatch) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
}
} else {
unmatchedShardIds.add(shardId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ private void doExecuteForked(
clusterService,
transportService,
projectResolver,
indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis),
task,
request,
localIndices,
Expand All @@ -273,7 +274,7 @@ private void doExecuteForked(
singleThreadedExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
Expand Down Expand Up @@ -383,11 +384,13 @@ private static void mergeIndexResponses(
}

private static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis
) {
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.clusterAlias(clusterAlias);
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
Expand Down
Loading
Loading