-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Term vector API on stateless search nodes #129902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
59876ce
to
dd19b4c
Compare
dd19b4c
to
ab4e1ed
Compare
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
82434e3
to
e3b6a4b
Compare
Pinging @elastic/es-search (Team:Search) |
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh | ||
* to ensure the search shards are up to date before returning the no-op. | ||
*/ | ||
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have only the ActionType and the request in server
, and the rest lies into the serverless code base? Similarly to StatelessPrimaryRelocationAction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I would prefer that too.
@@ -902,6 +902,10 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boo | |||
} | |||
} | |||
|
|||
public boolean isDocumentInLiveVersionMap(BytesRef uid) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the method name is meaningful, could we add a bit of documentation?
); | ||
); | ||
if (iterator == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it will execute on the indexing shard in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it'll execute the request in the receiving node. I think that we should return an empty iterator instead.
ShardId shardId, | ||
ActionListener<MultiTermVectorsShardResponse> listener | ||
) throws IOException { | ||
if (DiscoveryNode.isStateless(clusterService.getSettings())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can capture this only once instead of reevaluating for every request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I left a few comments
@@ -7,11 +7,6 @@ routing: | |||
settings: | |||
index: | |||
number_of_shards: 5 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this removed intentionally?
logger.debug("refreshing index shard [{}] due to mtv_eds", shardId); | ||
BasicReplicationRequest refreshRequest = new BasicReplicationRequest(shardId); | ||
refreshRequest.waitForActiveShards(ActiveShardCount.NONE); | ||
client.executeLocally(TransportShardRefreshAction.TYPE, refreshRequest, l.delegateFailureAndWrap((ll, r) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment here pointing that we have to go through the transport action instead of just calling refresh in the indexShard
instance to ensure that the unpromotable shards converge towards the refreshed generation?
// ongoing refresh and before the search shards being updated with the new commit, because the documents are | ||
// guaranteed to be the in the live version map archive until search shards are updated with the new commit. | ||
// Thus, we can safely respond immediately as a no-op. | ||
logger.debug("mts_eds does not require refresh of index shard [{}]", shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant mtv_eds
?
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE) | ||
: ACTION_NAME + " should only be executed on a stateless indexing node"; | ||
logger.debug("received request with {} docs", request.docIds.length); | ||
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ok failing the request if the primary moved in the meantime?
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh | ||
* to ensure the search shards are up to date before returning the no-op. | ||
*/ | ||
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I would prefer that too.
); | ||
); | ||
if (iterator == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it'll execute the request in the receiving node. I think that we should return an empty iterator instead.
@@ -902,6 +902,10 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boo | |||
} | |||
} | |||
|
|||
public boolean isDocumentInLiveVersionMap(BytesRef uid) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it's better if we name this isDocumentSearchable
? more generic and less tied to implementation details. And +1 to add more docs.
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless (see #94257). However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node.
The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them.
Relates ES-12112