Skip to content

Make can_match code a little easier to reuse #126588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
Expand Down Expand Up @@ -76,7 +77,7 @@ final class CanMatchPreFilterSearchPhase {
private int numPossibleMatches;
private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;

CanMatchPreFilterSearchPhase(
private CanMatchPreFilterSearchPhase(
Logger logger,
SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Expand Down Expand Up @@ -123,6 +124,57 @@ final class CanMatchPreFilterSearchPhase {
this.shardItIndexMap = shardItIndexMap;
}

public static SubscribableListener<List<SearchShardIterator>> execute(
Logger logger,
SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Executor executor,
SearchRequest request,
List<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
) {
if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
}
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
listener.onFailure(new SearchPhaseExecutionException("can_match", "start", e, ShardSearchFailure.EMPTY_ARRAY));
}

@Override
protected void doRun() {
assert assertSearchCoordinationThread();
new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
nodeIdToConnection,
aliasFilter,
concreteIndexBoosts,
executor,
request,
shardsIts,
timeProvider,
task,
requireAtLeastOneMatch,
coordinatorRewriteContextProvider,
listener
).runCoordinatorRewritePhase();
}
});
return listener;
}

private static boolean assertSearchCoordinationThread() {
return ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
}
Expand Down Expand Up @@ -165,7 +217,7 @@ private void runCoordinatorRewritePhase() {
}
}
if (matchedShardLevelRequests.isEmpty()) {
finishPhase();
listener.onResponse(getIterator(shardsIts));
} else {
// verify missing shards only for the shards that we hit for the query
checkNoMissingShards(matchedShardLevelRequests);
Expand All @@ -176,18 +228,14 @@ private void runCoordinatorRewritePhase() {
private void consumeResult(boolean canMatch, ShardSearchRequest request) {
CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null);
result.setShardIndex(request.shardRequestIndex());
consumeResult(result, () -> {});
consumeResult(result);
}

private void consumeResult(CanMatchShardResponse result, Runnable next) {
try {
final boolean canMatch = result.canMatch();
final MinAndMax<?> minAndMax = result.estimatedMinAndMax();
if (canMatch || minAndMax != null) {
consumeResult(result.getShardIndex(), canMatch, minAndMax);
}
} finally {
next.run();
private void consumeResult(CanMatchShardResponse result) {
final boolean canMatch = result.canMatch();
final MinAndMax<?> minAndMax = result.estimatedMinAndMax();
if (canMatch || minAndMax != null) {
consumeResult(result.getShardIndex(), canMatch, minAndMax);
}
}

Expand Down Expand Up @@ -226,7 +274,7 @@ private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchSha
* If there are failures during a round, there will be a follow-up round
* to retry on other available shard copies.
*/
class Round extends AbstractRunnable {
private class Round extends AbstractRunnable {
private final List<SearchShardIterator> shards;
private final CountDown countDown;
private final AtomicReferenceArray<Exception> failedResponses;
Expand Down Expand Up @@ -296,11 +344,10 @@ public void onFailure(Exception e) {

private void onOperation(int idx, CanMatchShardResponse response) {
failedResponses.set(idx, null);
consumeResult(response, () -> {
if (countDown.countDown()) {
finishRound();
}
});
consumeResult(response);
if (countDown.countDown()) {
finishRound();
}
}

private void onOperationFailed(int idx, Exception e) {
Expand All @@ -322,7 +369,7 @@ private void finishRound() {
}
}
if (remainingShards.isEmpty()) {
finishPhase();
listener.onResponse(getIterator(shardsIts));
} else {
// trigger another round, forcing execution
executor.execute(new Round(remainingShards) {
Expand All @@ -339,7 +386,7 @@ public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
onPhaseFailure("round", e);
listener.onFailure(new SearchPhaseExecutionException("can_match", "round", e, ShardSearchFailure.EMPTY_ARRAY));
}
}

Expand All @@ -363,13 +410,9 @@ private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<
);
}

private void finishPhase() {
listener.onResponse(getIterator(shardsIts));
}

private static final float DEFAULT_INDEX_BOOST = 1.0f;

public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shardIt) {
private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shardIt) {
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
Expand All @@ -386,33 +429,6 @@ public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shar
);
}

public void start() {
if (shardsIts.isEmpty()) {
finishPhase();
return;
}
// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [can_match] phase", request), e);
}
onPhaseFailure("start", e);
}

@Override
protected void doRun() {
assert assertSearchCoordinationThread();
runCoordinatorRewritePhase();
}
});
}

private void onPhaseFailure(String msg, Exception cause) {
listener.onFailure(new SearchPhaseExecutionException("can_match", msg, cause, ShardSearchFailure.EMPTY_ARRAY));
}

private synchronized List<SearchShardIterator> getIterator(List<SearchShardIterator> shardsIts) {
// TODO: pick the local shard when possible
if (requireAtLeastOneMatch && numPossibleMatches == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void runNewSearchPhase(
// that is signaled to the local can match through the SearchShardIterator#prefiltered flag. Local shards do need to go
// through the local can match phase.
if (SearchService.canRewriteToMatchNone(searchRequest.source())) {
new CanMatchPreFilterSearchPhase(
CanMatchPreFilterSearchPhase.execute(
logger,
searchTransportService,
connectionLookup,
Expand All @@ -176,22 +176,24 @@ public void runNewSearchPhase(
timeProvider,
task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
listener.delegateFailureAndWrap(
(searchResponseActionListener, searchShardIterators) -> runOpenPointInTimePhase(
task,
searchRequest,
executor,
searchShardIterators,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
)
.addListener(
listener.delegateFailureAndWrap(
(searchResponseActionListener, searchShardIterators) -> runOpenPointInTimePhase(
task,
searchRequest,
executor,
searchShardIterators,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
clusters
)
)
)
).start();
);
} else {
runOpenPointInTimePhase(
task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ public void runNewSearchPhase(
if (preFilter) {
// only for aggs we need to contact shards even if there are no matches
boolean requireAtLeastOneMatch = searchRequest.source() != null && searchRequest.source().aggregations() != null;
new CanMatchPreFilterSearchPhase(
CanMatchPreFilterSearchPhase.execute(
logger,
searchTransportService,
connectionLookup,
Expand All @@ -1496,24 +1496,26 @@ public void runNewSearchPhase(
timeProvider,
task,
requireAtLeastOneMatch,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
listener.delegateFailureAndWrap((l, iters) -> {
runNewSearchPhase(
task,
searchRequest,
executor,
iters,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
false,
threadPool,
clusters
);
})
).start();
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
)
.addListener(
listener.delegateFailureAndWrap(
(l, iters) -> runNewSearchPhase(
task,
searchRequest,
executor,
iters,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
false,
threadPool,
clusters
)
)
);
return;
}
// for synchronous CCS minimize_roundtrips=false, use the CCSSingleCoordinatorSearchProgressListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
new SearchShardsResponse(toGroups(shardIts), project.cluster().nodes().getAllNodes(), aliasFilters)
);
} else {
new CanMatchPreFilterSearchPhase(logger, searchTransportService, (clusterAlias, node) -> {
CanMatchPreFilterSearchPhase.execute(logger, searchTransportService, (clusterAlias, node) -> {
assert Objects.equals(clusterAlias, searchShardsRequest.clusterAlias());
return transportService.getConnection(project.cluster().nodes().get(node));
},
Expand All @@ -168,9 +168,13 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
timeProvider,
(SearchTask) task,
false,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
delegate.map(its -> new SearchShardsResponse(toGroups(its), project.cluster().nodes().getAllNodes(), aliasFilters))
).start();
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis)
)
.addListener(
delegate.map(
its -> new SearchShardsResponse(toGroups(its), project.cluster().nodes().getAllNodes(), aliasFilters)
)
);
}
})
);
Expand Down
Loading