Skip to content

Term vector API on stateless search nodes #129902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ routing:
settings:
index:
number_of_shards: 5
number_of_replicas: 0

- do:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this removed intentionally?

cluster.health:
wait_for_status: green

- do:
index:
Expand Down Expand Up @@ -52,7 +47,6 @@ requires routing:
settings:
index:
number_of_shards: 5
number_of_replicas: 0
mappings:
_routing:
required: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@
index:
translog.flush_threshold_size: "512MB"
number_of_shards: 1
number_of_replicas: 0
refresh_interval: -1
mappings:
properties:
text:
type : "text"
term_vector : "with_positions_offsets"

- do:
cluster.health:
wait_for_status: green

- do:
index:
index: testidx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
settings:
index:
refresh_interval: -1
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green

- do:
index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.elasticsearch.action.synonyms.TransportPutSynonymsAction;
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
import org.elasticsearch.action.termvectors.TermVectorsAction;
import org.elasticsearch.action.termvectors.TransportEnsureDocsSearchableAction;
import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction;
import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction;
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
Expand Down Expand Up @@ -720,6 +721,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

actions.register(TransportIndexAction.TYPE, TransportIndexAction.class);
actions.register(TransportGetAction.TYPE, TransportGetAction.class);
actions.register(TransportEnsureDocsSearchableAction.TYPE, TransportEnsureDocsSearchableAction.class);
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class);
actions.register(TransportShardMultiTermsVectorAction.TYPE, TransportShardMultiTermsVectorAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*
* This file was contributed to by generative AI
*/

package org.elasticsearch.action.termvectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* This action is used in serverless to ensure that documents are searchable on the search tier before processing
* term vector requests. It is an intermediate action that is executed on the indexing node and responds
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh
* to ensure the search shards are up to date before returning the no-op.
*/
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have only the ActionType and the request in server, and the rest lies into the serverless code base? Similarly to StatelessPrimaryRelocationAction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would prefer that too.

TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest,
ActionResponse.Empty> {

private static final Logger logger = LogManager.getLogger(TransportEnsureDocsSearchableAction.class);
private final NodeClient client;
private final IndicesService indicesService;

private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "/eds";
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(ACTION_NAME);

@Inject
public TransportEnsureDocsSearchableAction(
ClusterService clusterService,
NodeClient client,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
ACTION_NAME,
threadPool,
clusterService,
transportService,
actionFilters,
projectResolver,
indexNameExpressionResolver,
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest::new,
threadPool.executor(ThreadPool.Names.GET)
);
this.client = client;
this.indicesService = indicesService;
}

@Override
protected boolean isSubAction() {
return true;
}

@Override
protected Writeable.Reader<ActionResponse.Empty> getResponseReader() {
return in -> ActionResponse.Empty.INSTANCE;
}

@Override
protected boolean resolveIndex(TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request) {
return false;
}

@Override
protected ShardIterator shards(ProjectState state, InternalRequest request) {
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
final var primaryShard = state.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().shardId())
.primaryShard();
if (primaryShard.active() == false) {
throw new NoShardAvailableActionException(primaryShard.shardId(), "primary shard is not active");
}
DiscoveryNode node = state.cluster().nodes().get(primaryShard.currentNodeId());
assert node != null;
return new ShardIterator(primaryShard.shardId(), List.of(primaryShard));
}

@Override
protected void asyncShardOperation(
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request,
ShardId shardId,
ActionListener<ActionResponse.Empty> listener
) throws IOException {
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE)
: ACTION_NAME + " should only be executed on a stateless indexing node";
logger.debug("received request with {} docs", request.docIds.length);
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we ok failing the request if the primary moved in the meantime?

final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
boolean docsFoundInLiveVersionMap = false;
for (String docId : request.docIds()) {
final var docUid = Uid.encodeId(docId);
// There are a couple of limited cases where we may unnecessarily trigger an additional external refresh:
// 1. Asking whether a document is in the live version map may incur a stateless refresh in itself.
// 2. The document may be in the live version map archive, even though it has been refreshed to the search shards. The
// document will be removed from the archive in a subsequent stateless refresh.
// We prefer simplicity to complexity (trying to avoid the unnecessary stateless refresh) for the above limited cases.
boolean docInLiveVersionMap = indexShard.withEngine(engine -> engine.isDocumentInLiveVersionMap(docUid));
if (docInLiveVersionMap) {
logger.debug("doc id [{}] (uid [{}]) found in live version map of index shard [{}]", docId, docUid, shardId);
docsFoundInLiveVersionMap = true;
break;
}
}

if (docsFoundInLiveVersionMap) {
logger.debug("refreshing index shard [{}] due to mtv_eds", shardId);
BasicReplicationRequest refreshRequest = new BasicReplicationRequest(shardId);
refreshRequest.waitForActiveShards(ActiveShardCount.NONE);
client.executeLocally(TransportShardRefreshAction.TYPE, refreshRequest, l.delegateFailureAndWrap((ll, r) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here pointing that we have to go through the transport action instead of just calling refresh in the indexShard instance to ensure that the unpromotable shards converge towards the refreshed generation?

// TransportShardRefreshAction.UnpromotableReplicasRefreshProxy.onPrimaryOperationComplete() returns a
// single shard failure if unpromotable(s) failed, with a combined list of (supressed) exceptions.
if (r.getShardInfo().getFailed() > 0) {
assert r.getShardInfo().getFailed() == 1
: "expected a single shard failure, got " + r.getShardInfo().getFailed() + " failures";
throw new ElasticsearchException("failed to refresh [{}]", r.getShardInfo().getFailures()[0].getCause(), shardId);
}
logger.debug("refreshed index shard [{}] due to mtv_eds", shardId);
ll.onResponse(ActionResponse.Empty.INSTANCE);
}));
} else {
// Notice that there cannot be a race between the document(s) being evicted from the live version map due to an
// ongoing refresh and before the search shards being updated with the new commit, because the documents are
// guaranteed to be the in the live version map archive until search shards are updated with the new commit.
// Thus, we can safely respond immediately as a no-op.
logger.debug("mts_eds does not require refresh of index shard [{}]", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant mtv_eds?

l.onResponse(ActionResponse.Empty.INSTANCE);
}
}));
}

@Override
protected ActionResponse.Empty shardOperation(
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request,
ShardId shardId
) {
throw new UnsupportedOperationException();
}

public static final class EnsureDocsSearchableRequest extends SingleShardRequest<EnsureDocsSearchableRequest> {

private int shardId; // this is not transmitted over the wire
private String[] docIds;

public EnsureDocsSearchableRequest() {}

EnsureDocsSearchableRequest(StreamInput in) throws IOException {
super(in);
docIds = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}

public EnsureDocsSearchableRequest(String index, int shardId, String[] docIds) {
super(index);
this.shardId = shardId;
this.docIds = docIds;
}

public int shardId() {
return this.shardId;
}

public String[] docIds() {
return docIds;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(docIds);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*
* This file was contributed to by generative AI
*/

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,12 +33,15 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

import static org.elasticsearch.core.Strings.format;

public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
MultiTermVectorsShardRequest,
MultiTermVectorsShardResponse> {

private final NodeClient client;
private final IndicesService indicesService;

private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
Expand All @@ -42,6 +50,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
@Inject
public TransportShardMultiTermsVectorAction(
ClusterService clusterService,
NodeClient client,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool,
Expand All @@ -60,6 +69,7 @@ public TransportShardMultiTermsVectorAction(
MultiTermVectorsShardRequest::new,
threadPool.executor(ThreadPool.Names.GET)
);
this.client = client;
this.indicesService = indicesService;
}

Expand All @@ -80,9 +90,43 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {

@Override
protected ShardIterator shards(ProjectState project, InternalRequest request) {
ShardIterator shards = clusterService.operationRouting()
ShardIterator iterator = clusterService.operationRouting()
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
if (iterator == null) {
return null;
}
return ShardIterator.allSearchableShards(iterator);
}

@Override
protected void asyncShardOperation(
MultiTermVectorsShardRequest request,
ShardId shardId,
ActionListener<MultiTermVectorsShardResponse> listener
) throws IOException {
if (DiscoveryNode.isStateless(clusterService.getSettings())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can capture this only once instead of reevaluating for every request?

final String[] realTimeIds = request.requests.stream()
.filter(r -> r.realtime())
.map(TermVectorsRequest::id)
.toArray(String[]::new);
if (realTimeIds.length > 0) {
final var ensureDocsSearchableRequest = new TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest(
request.index(),
shardId.id(),
realTimeIds
);
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
client.executeLocally(
TransportEnsureDocsSearchableAction.TYPE,
ensureDocsSearchableRequest,
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
);
} else {
super.asyncShardOperation(request, shardId, listener);
}
} else {
super.asyncShardOperation(request, shardId, listener);
}
}

@Override
Expand Down
Loading