Skip to content

Commit 7c19504

Browse files
authored
[8.18] Cancel expired async search task when a remote returns its results (#126583) (#126916)
* Cancel expired async search task when a remote returns its results (#126583) A while ago we enabled using ccs_minimize_roundtrips in async search. This makes it possible for users of async search to send a single search request per remote cluster, and minimize the impact of network latency. With non minimized roundtrips, we have pretty recurring cancellation checks: as part of the execution, we detect that a task expired whenever each shard comes back with its results. In a scenario where the coord node does not hold data, or only remote data is targeted by an async search, we have much less chance of detecting cancellation if roundtrips are minimized. The local coordinator would do nothing other than waiting for the minimized results from each remote cluster. One scenario where we can check for cancellation is when each cluster comes back with its full set of results. This commit adds such check, plus some testing for async search cancellation with minimized roundtrips. * compile error
1 parent 604df82 commit 7c19504

File tree

4 files changed

+291
-62
lines changed

4 files changed

+291
-62
lines changed

docs/changelog/126583.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126583
2+
summary: Cancel expired async search task when a remote returns its results
3+
area: CCS
4+
type: bug
5+
issues: []

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void resetSearchListenerPlugin() {
8787
}
8888

8989
private SubmitAsyncSearchRequest makeSearchRequest(String... indices) {
90-
CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase();
90+
CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase();
9191

9292
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indices);
9393
request.setCcsMinimizeRoundtrips(randomBoolean());
@@ -220,7 +220,8 @@ public void testCancelledSearch() throws Exception {
220220
String remoteIndex = (String) testClusterInfo.get("remote.index");
221221

222222
SubmitAsyncSearchRequest searchRequest = makeSearchRequest(localIndex, REMOTE1 + ":" + remoteIndex);
223-
CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase();
223+
CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase();
224+
CrossClusterAsyncSearchIT.SearchListenerPlugin.blockRemoteQueryPhase();
224225

225226
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
226227
final AsyncSearchResponse response = cluster(LOCAL_CLUSTER).client(nodeName)
@@ -232,7 +233,8 @@ public void testCancelledSearch() throws Exception {
232233
response.decRef();
233234
assertTrue(response.isRunning());
234235
}
235-
CrossClusterAsyncSearchIT.SearchListenerPlugin.waitSearchStarted();
236+
CrossClusterAsyncSearchIT.SearchListenerPlugin.waitLocalSearchStarted();
237+
CrossClusterAsyncSearchIT.SearchListenerPlugin.waitRemoteSearchStarted();
236238

237239
ActionFuture<ListTasksResponse> cancelFuture;
238240
try {
@@ -290,7 +292,8 @@ public void testCancelledSearch() throws Exception {
290292
assertTrue(taskInfo.description(), taskInfo.cancelled());
291293
}
292294
} finally {
293-
CrossClusterAsyncSearchIT.SearchListenerPlugin.allowQueryPhase();
295+
CrossClusterAsyncSearchIT.SearchListenerPlugin.allowLocalQueryPhase();
296+
CrossClusterAsyncSearchIT.SearchListenerPlugin.allowRemoteQueryPhase();
294297
}
295298

296299
assertBusy(() -> assertTrue(cancelFuture.isDone()));
@@ -314,7 +317,7 @@ private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
314317
}
315318

316319
private Map<String, Object> setupClusters() {
317-
String localIndex = "demo";
320+
String localIndex = "local";
318321
int numShardsLocal = randomIntBetween(2, 10);
319322
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
320323
assertAcked(
@@ -326,7 +329,7 @@ private Map<String, Object> setupClusters() {
326329
);
327330
indexDocs(client(LOCAL_CLUSTER), localIndex);
328331

329-
String remoteIndex = "prod";
332+
String remoteIndex = "remote";
330333
int numShardsRemote = randomIntBetween(2, 10);
331334
for (String clusterAlias : remoteClusterAlias()) {
332335
final InternalTestCluster remoteCluster = cluster(clusterAlias);

0 commit comments

Comments
 (0)