Skip to content

Commit dd19b4c

Browse files
committed
Term vector API on stateless search nodes
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
1 parent 788e18f commit dd19b4c

File tree

8 files changed

+318
-29
lines changed

8 files changed

+318
-29
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@
206206
import org.elasticsearch.action.synonyms.TransportPutSynonymsAction;
207207
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
208208
import org.elasticsearch.action.termvectors.TermVectorsAction;
209+
import org.elasticsearch.action.termvectors.TransportEnsureDocsSearchableAction;
209210
import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction;
210211
import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction;
211212
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
@@ -717,6 +718,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
717718

718719
actions.register(TransportIndexAction.TYPE, TransportIndexAction.class);
719720
actions.register(TransportGetAction.TYPE, TransportGetAction.class);
721+
actions.register(TransportEnsureDocsSearchableAction.TYPE, TransportEnsureDocsSearchableAction.class);
720722
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
721723
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class);
722724
actions.register(TransportShardMultiTermsVectorAction.TYPE, TransportShardMultiTermsVectorAction.class);
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
10+
*/
11+
12+
package org.elasticsearch.action.termvectors;
13+
14+
import org.apache.logging.log4j.LogManager;
15+
import org.apache.logging.log4j.Logger;
16+
import org.elasticsearch.action.ActionListener;
17+
import org.elasticsearch.action.ActionResponse;
18+
import org.elasticsearch.action.ActionType;
19+
import org.elasticsearch.action.NoShardAvailableActionException;
20+
import org.elasticsearch.action.support.ActionFilters;
21+
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
22+
import org.elasticsearch.cluster.ProjectState;
23+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
26+
import org.elasticsearch.cluster.project.ProjectResolver;
27+
import org.elasticsearch.cluster.routing.ShardIterator;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.Strings;
30+
import org.elasticsearch.common.io.stream.StreamInput;
31+
import org.elasticsearch.common.io.stream.StreamOutput;
32+
import org.elasticsearch.common.io.stream.Writeable;
33+
import org.elasticsearch.index.IndexService;
34+
import org.elasticsearch.index.mapper.Uid;
35+
import org.elasticsearch.index.shard.IndexShard;
36+
import org.elasticsearch.index.shard.ShardId;
37+
import org.elasticsearch.indices.IndicesService;
38+
import org.elasticsearch.injection.guice.Inject;
39+
import org.elasticsearch.threadpool.ThreadPool;
40+
import org.elasticsearch.transport.TransportService;
41+
42+
import java.io.IOException;
43+
import java.util.List;
44+
import java.util.Objects;
45+
46+
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction<
47+
MultiTermVectorsShardRequest,
48+
TransportEnsureDocsSearchableAction.Response> {
49+
50+
private static final Logger logger = LogManager.getLogger(TransportEnsureDocsSearchableAction.class);
51+
private final IndicesService indicesService;
52+
53+
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "/eds";
54+
public static final ActionType<TransportEnsureDocsSearchableAction.Response> TYPE = new ActionType<>(ACTION_NAME);
55+
56+
@Inject
57+
public TransportEnsureDocsSearchableAction(
58+
ClusterService clusterService,
59+
TransportService transportService,
60+
IndicesService indicesService,
61+
ThreadPool threadPool,
62+
ActionFilters actionFilters,
63+
ProjectResolver projectResolver,
64+
IndexNameExpressionResolver indexNameExpressionResolver
65+
) {
66+
super(
67+
ACTION_NAME,
68+
threadPool,
69+
clusterService,
70+
transportService,
71+
actionFilters,
72+
projectResolver,
73+
indexNameExpressionResolver,
74+
MultiTermVectorsShardRequest::new,
75+
threadPool.executor(ThreadPool.Names.GET)
76+
);
77+
this.indicesService = indicesService;
78+
}
79+
80+
@Override
81+
protected boolean isSubAction() {
82+
return true;
83+
}
84+
85+
@Override
86+
protected Writeable.Reader<TransportEnsureDocsSearchableAction.Response> getResponseReader() {
87+
return TransportEnsureDocsSearchableAction.Response::new;
88+
}
89+
90+
@Override
91+
protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
92+
return false;
93+
}
94+
95+
@Override
96+
protected ShardIterator shards(ProjectState state, InternalRequest request) {
97+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
98+
final var primaryShard = state.routingTable()
99+
.shardRoutingTable(request.concreteIndex(), request.request().shardId())
100+
.primaryShard();
101+
if (primaryShard.active() == false) {
102+
throw new NoShardAvailableActionException(primaryShard.shardId(), "primary shard is not active");
103+
}
104+
DiscoveryNode node = state.cluster().nodes().get(primaryShard.currentNodeId());
105+
assert node != null;
106+
return new ShardIterator(primaryShard.shardId(), List.of(primaryShard));
107+
}
108+
109+
@Override
110+
protected void asyncShardOperation(
111+
MultiTermVectorsShardRequest request,
112+
ShardId shardId,
113+
ActionListener<TransportEnsureDocsSearchableAction.Response> listener
114+
) throws IOException {
115+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
116+
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE)
117+
: ACTION_NAME + " should only be executed on a stateless indexing node";
118+
logger.debug("received locally {} with {} sub requests", request, request.locations.size());
119+
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> {
120+
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
121+
final IndexShard indexShard = indexService.getShard(shardId.id());
122+
boolean refreshBeforeReturning = false;
123+
for (int i = 0; i < request.locations.size(); i++) {
124+
TermVectorsRequest termVectorsRequest = request.requests.get(i);
125+
String docId = termVectorsRequest.id();
126+
if (termVectorsRequest.realtime() && docId != null && docId.isEmpty() == false) {
127+
final var docUid = Uid.encodeId(docId);
128+
boolean docInLiveVersionMap = indexShard.withEngine(engine -> engine.isDocumentInLiveVersionMap(docUid));
129+
if (docInLiveVersionMap) {
130+
logger.debug("doc id [{}] (uid [{}]) requires refresh of index shard [{}]", docId, docUid, shardId);
131+
refreshBeforeReturning = true;
132+
break;
133+
}
134+
}
135+
}
136+
long primaryTerm = indexShard.getOperationPrimaryTerm();
137+
if (refreshBeforeReturning) {
138+
logger.debug("refreshing index shard [{}] due to mtv_eds", shardId);
139+
indexShard.externalRefresh(
140+
"refresh_mtv_eds",
141+
l.map(r -> new TransportEnsureDocsSearchableAction.Response(primaryTerm, -1))
142+
);
143+
} else {
144+
// We respond with the current segment generation, because there is a race between the document(s) being evicted from the
145+
// live version map due to an ongoing refresh and before the search shards being updated with the new commit. Thus,
146+
// the search shard should ensure to wait for the segment generation before serving the term vector request locally.
147+
long segmentGeneration = indexShard.withEngine(engine -> engine.getLastCommittedSegmentInfos().getGeneration());
148+
final var response = new TransportEnsureDocsSearchableAction.Response(primaryTerm, segmentGeneration);
149+
logger.debug("mts_eds does not require refresh of index shard [{}], responding [{}]", shardId, response);
150+
l.onResponse(response);
151+
}
152+
}));
153+
}
154+
155+
@Override
156+
protected TransportEnsureDocsSearchableAction.Response shardOperation(MultiTermVectorsShardRequest request, ShardId shardId) {
157+
throw new UnsupportedOperationException();
158+
}
159+
160+
public static class Response extends ActionResponse {
161+
private final long primaryTerm;
162+
private final long segmentGeneration;
163+
164+
public Response(long primaryTerm, long segmentGeneration) {
165+
this.primaryTerm = primaryTerm;
166+
this.segmentGeneration = segmentGeneration;
167+
}
168+
169+
public Response(StreamInput in) throws IOException {
170+
primaryTerm = in.readVLong();
171+
segmentGeneration = in.readZLong();
172+
}
173+
174+
@Override
175+
public void writeTo(StreamOutput out) throws IOException {
176+
out.writeVLong(primaryTerm);
177+
out.writeZLong(segmentGeneration);
178+
}
179+
180+
public long primaryTerm() {
181+
return primaryTerm;
182+
}
183+
184+
/**
185+
* @return The segment generation that the search shard should wait for before handling the term vector API request locally.
186+
* Returns -1 if the search shard does not need to wait for any segment generation.
187+
*/
188+
public long segmentGeneration() {
189+
return segmentGeneration;
190+
}
191+
192+
@Override
193+
public boolean equals(Object o) {
194+
if (this == o) return true;
195+
if (o instanceof Response == false) return false;
196+
Response response = (Response) o;
197+
return primaryTerm == response.primaryTerm && segmentGeneration == response.segmentGeneration;
198+
}
199+
200+
@Override
201+
public int hashCode() {
202+
return Objects.hash(primaryTerm, segmentGeneration);
203+
}
204+
205+
@Override
206+
public String toString() {
207+
return Strings.format("Response{primaryTerm=%d, segmentGeneration=%d}", primaryTerm, segmentGeneration);
208+
}
209+
}
210+
}

server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

14+
import org.elasticsearch.action.ActionListener;
1215
import org.elasticsearch.action.ActionType;
1316
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.ContextPreservingActionListener;
1418
import org.elasticsearch.action.support.TransportActions;
1519
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
20+
import org.elasticsearch.client.internal.node.NodeClient;
1621
import org.elasticsearch.cluster.ProjectState;
1722
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
1824
import org.elasticsearch.cluster.project.ProjectResolver;
1925
import org.elasticsearch.cluster.routing.ShardIterator;
2026
import org.elasticsearch.cluster.service.ClusterService;
2127
import org.elasticsearch.common.io.stream.Writeable;
2228
import org.elasticsearch.index.IndexService;
29+
import org.elasticsearch.index.engine.Engine;
2330
import org.elasticsearch.index.shard.IndexShard;
2431
import org.elasticsearch.index.shard.ShardId;
2532
import org.elasticsearch.index.termvectors.TermVectorsService;
@@ -28,12 +35,15 @@
2835
import org.elasticsearch.threadpool.ThreadPool;
2936
import org.elasticsearch.transport.TransportService;
3037

38+
import java.io.IOException;
39+
3140
import static org.elasticsearch.core.Strings.format;
3241

3342
public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
3443
MultiTermVectorsShardRequest,
3544
MultiTermVectorsShardResponse> {
3645

46+
private final NodeClient client;
3747
private final IndicesService indicesService;
3848

3949
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
@@ -42,6 +52,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
4252
@Inject
4353
public TransportShardMultiTermsVectorAction(
4454
ClusterService clusterService,
55+
NodeClient client,
4556
TransportService transportService,
4657
IndicesService indicesService,
4758
ThreadPool threadPool,
@@ -60,6 +71,7 @@ public TransportShardMultiTermsVectorAction(
6071
MultiTermVectorsShardRequest::new,
6172
threadPool.executor(ThreadPool.Names.GET)
6273
);
74+
this.client = client;
6375
this.indicesService = indicesService;
6476
}
6577

@@ -80,9 +92,42 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
8092

8193
@Override
8294
protected ShardIterator shards(ProjectState project, InternalRequest request) {
83-
ShardIterator shards = clusterService.operationRouting()
95+
ShardIterator iterator = clusterService.operationRouting()
8496
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
85-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
97+
if (iterator == null) {
98+
return null;
99+
}
100+
return ShardIterator.allSearchableShards(iterator);
101+
}
102+
103+
@Override
104+
protected void asyncShardOperation(
105+
MultiTermVectorsShardRequest request,
106+
ShardId shardId,
107+
ActionListener<MultiTermVectorsShardResponse> listener
108+
) throws IOException {
109+
boolean ensureDocsSearchable = DiscoveryNode.isStateless(clusterService.getSettings())
110+
&& request.requests.stream().anyMatch(r -> r.realtime() && r.id() != null && r.id().isEmpty() == false);
111+
if (ensureDocsSearchable) {
112+
client.executeLocally(TransportEnsureDocsSearchableAction.TYPE, request, listener.delegateFailureAndWrap((l, r) -> {
113+
if (r.segmentGeneration() == -1) {
114+
// Nothing to wait for, just handle the term vector request locally.
115+
super.asyncShardOperation(request, shardId, l);
116+
} else {
117+
assert r.segmentGeneration() > -1L : r.segmentGeneration();
118+
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM : r.primaryTerm();
119+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
120+
IndexShard indexShard = indexService.getShard(shardId.id());
121+
final ActionListener<Long> termAndGenerationListener = ContextPreservingActionListener.wrapPreservingContext(
122+
l.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll)),
123+
threadPool.getThreadContext()
124+
);
125+
indexShard.waitForPrimaryTermAndGeneration(r.primaryTerm(), r.segmentGeneration(), termAndGenerationListener);
126+
}
127+
}));
128+
} else {
129+
super.asyncShardOperation(request, shardId, listener);
130+
}
86131
}
87132

88133
@Override

0 commit comments

Comments
 (0)