Skip to content

Commit 1762733

Browse files
authored
Stateless real-time GET (elastic#93976)
For real-time get on Stateless, we'd need to first check the indexing shard whether it has the document in its Translog, if not we might have to wait on the search shard and then handle the GET locally. Relates ES-5537
1 parent 6208df0 commit 1762733

File tree

3 files changed

+97
-5
lines changed

3 files changed

+97
-5
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,20 @@
99
package org.elasticsearch.action.get;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.ActionListenerResponseHandler;
1213
import org.elasticsearch.action.ActionRunnable;
14+
import org.elasticsearch.action.NoShardAvailableActionException;
15+
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
1316
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
1418
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
19+
import org.elasticsearch.client.internal.node.NodeClient;
1520
import org.elasticsearch.cluster.ClusterState;
1621
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.cluster.routing.PlainShardIterator;
1724
import org.elasticsearch.cluster.routing.ShardIterator;
25+
import org.elasticsearch.cluster.routing.ShardRouting;
1826
import org.elasticsearch.cluster.service.ClusterService;
1927
import org.elasticsearch.common.inject.Inject;
2028
import org.elasticsearch.common.io.stream.Writeable;
@@ -24,6 +32,8 @@
2432
import org.elasticsearch.index.shard.ShardId;
2533
import org.elasticsearch.indices.ExecutorSelector;
2634
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.logging.LogManager;
36+
import org.elasticsearch.logging.Logger;
2737
import org.elasticsearch.threadpool.ThreadPool;
2838
import org.elasticsearch.transport.TransportService;
2939

@@ -34,8 +44,11 @@
3444
*/
3545
public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {
3646

47+
private static final Logger logger = LogManager.getLogger(TransportGetAction.class);
48+
3749
private final IndicesService indicesService;
3850
private final ExecutorSelector executorSelector;
51+
private final NodeClient client;
3952

4053
@Inject
4154
public TransportGetAction(
@@ -45,7 +58,8 @@ public TransportGetAction(
4558
ThreadPool threadPool,
4659
ActionFilters actionFilters,
4760
IndexNameExpressionResolver indexNameExpressionResolver,
48-
ExecutorSelector executorSelector
61+
ExecutorSelector executorSelector,
62+
NodeClient client
4963
) {
5064
super(
5165
GetAction.NAME,
@@ -59,6 +73,7 @@ public TransportGetAction(
5973
);
6074
this.indicesService = indicesService;
6175
this.executorSelector = executorSelector;
76+
this.client = client;
6277
// register the internal TransportGetFromTranslogAction
6378
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
6479
}
@@ -78,7 +93,10 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
7893
request.request().routing(),
7994
request.request().preference()
8095
);
81-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
96+
if (iterator == null) {
97+
return null;
98+
}
99+
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
82100
}
83101

84102
@Override
@@ -91,6 +109,12 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
91109
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
92110
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
93111
IndexShard indexShard = indexService.getShard(shardId.id());
112+
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
113+
handleGetOnUnpromotableShard(request, indexShard, listener);
114+
return;
115+
}
116+
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
117+
: "A TransportGetAction should always be handled by a search shard in Stateless";
94118
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
95119
asyncGet(request, shardId, listener);
96120
} else {
@@ -148,6 +172,66 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
148172
}
149173
}
150174

175+
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
176+
throws IOException {
177+
ShardId shardId = indexShard.shardId();
178+
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
179+
if (request.refresh()) {
180+
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
181+
var refreshRequest = new BasicReplicationRequest(shardId);
182+
refreshRequest.setParentTask(request.getParentTask());
183+
client.executeLocally(
184+
TransportShardRefreshAction.TYPE,
185+
refreshRequest,
186+
ActionListener.wrap(replicationResponse -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
187+
);
188+
} else if (request.realtime()) {
189+
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
190+
getFromTranslogRequest.setParentTask(request.getParentTask());
191+
transportService.sendRequest(
192+
node,
193+
TransportGetFromTranslogAction.NAME,
194+
getFromTranslogRequest,
195+
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
196+
if (r.getResult() != null) {
197+
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
198+
l.onResponse(new GetResponse(r.getResult()));
199+
} else {
200+
logger.debug(
201+
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
202+
request.id(),
203+
r.segmentGeneration()
204+
);
205+
if (r.segmentGeneration() == -1) {
206+
// Nothing to wait for (no previous unsafe generation), just handle the Get locally.
207+
ActionRunnable.supply(listener, () -> shardOperation(request, shardId)).run();
208+
} else {
209+
assert r.segmentGeneration() > -1L;
210+
indexShard.waitForSegmentGeneration(
211+
r.segmentGeneration(),
212+
ActionListener.wrap(aLong -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
213+
);
214+
}
215+
}
216+
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
217+
);
218+
} else {
219+
// A non-real-time get with no explicit refresh requested.
220+
super.asyncShardOperation(request, shardId, listener);
221+
}
222+
}
223+
224+
private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
225+
var clusterState = clusterService.state();
226+
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
227+
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
228+
throw new NoShardAvailableActionException(shardId, "primary shard is not active");
229+
}
230+
DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId());
231+
assert node != null;
232+
return node;
233+
}
234+
151235
private IndexShard getIndexShard(ShardId shardId) {
152236
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
153237
return indexService.getShard(shardId.id());

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
5454

5555
/**
5656
* Shards to use for a {@code GET} operation.
57+
* @return A shard iterator that can be used for GETs, or null if e.g. due to preferences no match is found.
5758
*/
5859
public ShardIterator getShards(
5960
ClusterState clusterState,

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ public class InternalEngine extends Engine {
214214

215215
private final ByteSizeValue totalDiskSpace;
216216

217+
protected static final String REAL_TIME_GET_REFRESH_SOURCE = "realtime_get";
218+
protected static final String UNSAFE_VERSION_MAP_REFRESH_SOURCE = "unsafe_version_map";
219+
217220
public InternalEngine(EngineConfig engineConfig) {
218221
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
219222
}
@@ -848,7 +851,7 @@ protected GetResult realtimeGetUnderLock(
848851
}
849852
}
850853
assert versionValue.seqNo >= 0 : versionValue;
851-
refreshIfNeeded("realtime_get", versionValue.seqNo);
854+
refreshIfNeeded(REAL_TIME_GET_REFRESH_SOURCE, versionValue.seqNo);
852855
}
853856
if (getFromSearcherIfNotInTranslog) {
854857
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
@@ -960,7 +963,7 @@ private VersionValue getVersionFromMap(BytesRef id) {
960963
// map so once we pass this point we can safely lookup from the version map.
961964
if (versionMap.isUnsafe()) {
962965
lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1);
963-
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
966+
refreshInternalSearcher(UNSAFE_VERSION_MAP_REFRESH_SOURCE, true);
964967
}
965968
versionMap.enforceSafeAccess();
966969
}
@@ -1929,6 +1932,10 @@ public RefreshResult maybeRefresh(String source) throws EngineException {
19291932
return refresh(source, SearcherScope.EXTERNAL, false);
19301933
}
19311934

1935+
protected RefreshResult refreshInternalSearcher(String source, boolean block) throws EngineException {
1936+
return refresh(source, SearcherScope.INTERNAL, block);
1937+
}
1938+
19321939
final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
19331940
// both refresh types will result in an internal refresh but only the external will also
19341941
// pass the new reader reference to the external reader manager.
@@ -3052,7 +3059,7 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {
30523059
if (lastRefreshedCheckpoint() < requestingSeqNo) {
30533060
synchronized (refreshIfNeededMutex) {
30543061
if (lastRefreshedCheckpoint() < requestingSeqNo) {
3055-
refresh(source, SearcherScope.INTERNAL, true);
3062+
refreshInternalSearcher(source, true);
30563063
}
30573064
}
30583065
}

0 commit comments

Comments
 (0)