Skip to content

Commit 512e868

Browse files
Test for cancelled task in TransportSnapshotsStatusAction.buildResponse() (#126740)
Testing for cancellation in buildResponse() avoids a lot of unnecessary processing in scenarios with many shards. Closes ES-10981.
1 parent 16070a3 commit 512e868

File tree

7 files changed

+238
-1
lines changed

7 files changed

+238
-1
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

1212
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.io.stream.StreamInput;
1415
import org.elasticsearch.common.io.stream.StreamOutput;
1516
import org.elasticsearch.index.shard.ShardId;
@@ -161,4 +162,9 @@ public int hashCode() {
161162
result = 31 * result + (failure != null ? failure.hashCode() : 0);
162163
return result;
163164
}
165+
166+
@Override
167+
public String toString() {
168+
return Strings.toString(this, true, true);
169+
}
164170
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

12+
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.xcontent.ToXContentFragment;
1314
import org.elasticsearch.xcontent.XContentBuilder;
1415

@@ -133,4 +134,9 @@ public int hashCode() {
133134
result = 31 * result + (stats != null ? stats.hashCode() : 0);
134135
return result;
135136
}
137+
138+
@Override
139+
public String toString() {
140+
return Strings.toString(this, true, true);
141+
}
136142
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

12+
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.xcontent.ToXContent;
1314
import org.elasticsearch.xcontent.ToXContentObject;
1415
import org.elasticsearch.xcontent.XContentBuilder;
@@ -150,4 +151,9 @@ public int hashCode() {
150151
result = 31 * result + totalShards;
151152
return result;
152153
}
154+
155+
@Override
156+
public String toString() {
157+
return Strings.toString(this, true, true);
158+
}
153159
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

+5
Original file line numberDiff line numberDiff line change
@@ -355,4 +355,9 @@ public int hashCode() {
355355
result = 31 * result + (int) (processedSize ^ (processedSize >>> 32));
356356
return result;
357357
}
358+
359+
@Override
360+
public String toString() {
361+
return Strings.toString(this, true, true);
362+
}
358363
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java

+7
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
package org.elasticsearch.action.admin.cluster.snapshots.status;
1111

1212
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.collect.Iterators;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.common.xcontent.ChunkedToXContent;
1618
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
1719
import org.elasticsearch.xcontent.ToXContent;
1820

@@ -71,4 +73,9 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
7173
Iterators.single((b, p) -> b.endArray().endObject())
7274
);
7375
}
76+
77+
@Override
78+
public String toString() {
79+
return Strings.toString(ChunkedToXContent.wrapAsToXContent(this), true, true);
80+
}
7481
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ protected void masterOperation(
165165

166166
}
167167

168-
private void buildResponse(
168+
// Package access for testing.
169+
void buildResponse(
169170
SnapshotsInProgress snapshotsInProgress,
170171
SnapshotsStatusRequest request,
171172
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
@@ -190,6 +191,9 @@ private void buildResponse(
190191
for (Map.Entry<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry
191192
.shardSnapshotStatusByRepoShardId()
192193
.entrySet()) {
194+
if (task.notifyIfCancelled(listener)) {
195+
return;
196+
}
193197
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
194198
if (status.nodeId() != null) {
195199
// We should have information about this shard from the shard:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.snapshots.status;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.client.internal.node.NodeClient;
15+
import org.elasticsearch.cluster.SnapshotsInProgress;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.index.IndexVersion;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.repositories.IndexId;
21+
import org.elasticsearch.repositories.RepositoriesService;
22+
import org.elasticsearch.repositories.ShardGeneration;
23+
import org.elasticsearch.snapshots.Snapshot;
24+
import org.elasticsearch.snapshots.SnapshotId;
25+
import org.elasticsearch.tasks.CancellableTask;
26+
import org.elasticsearch.tasks.TaskCancelHelper;
27+
import org.elasticsearch.tasks.TaskCancelledException;
28+
import org.elasticsearch.test.ClusterServiceUtils;
29+
import org.elasticsearch.test.ESTestCase;
30+
import org.elasticsearch.test.transport.CapturingTransport;
31+
import org.elasticsearch.threadpool.TestThreadPool;
32+
import org.elasticsearch.threadpool.ThreadPool;
33+
import org.elasticsearch.transport.TransportService;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Set;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.function.Consumer;
42+
43+
public class TransportSnapshotsStatusActionTests extends ESTestCase {
44+
45+
private ThreadPool threadPool;
46+
private ClusterService clusterService;
47+
private TransportService transportService;
48+
private RepositoriesService repositoriesService;
49+
private TransportSnapshotsStatusAction action;
50+
51+
@Before
52+
public void initializeComponents() throws Exception {
53+
threadPool = new TestThreadPool(TransportSnapshotsStatusActionTests.class.getName());
54+
clusterService = ClusterServiceUtils.createClusterService(threadPool);
55+
transportService = new CapturingTransport().createTransportService(
56+
clusterService.getSettings(),
57+
threadPool,
58+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
59+
address -> clusterService.localNode(),
60+
clusterService.getClusterSettings(),
61+
Set.of()
62+
);
63+
final var nodeClient = new NodeClient(clusterService.getSettings(), threadPool);
64+
repositoriesService = new RepositoriesService(
65+
clusterService.getSettings(),
66+
clusterService,
67+
Map.of(),
68+
Map.of(),
69+
threadPool,
70+
nodeClient,
71+
List.of()
72+
);
73+
action = new TransportSnapshotsStatusAction(
74+
transportService,
75+
clusterService,
76+
threadPool,
77+
repositoriesService,
78+
nodeClient,
79+
new ActionFilters(Set.of())
80+
);
81+
}
82+
83+
@After
84+
public void shutdownComponents() throws Exception {
85+
threadPool.shutdown();
86+
repositoriesService.close();
87+
transportService.close();
88+
clusterService.close();
89+
}
90+
91+
public void testBuildResponseDetectsTaskIsCancelledWhileProcessingCurrentSnapshotEntries() throws Exception {
92+
runBasicBuildResponseTest(true);
93+
}
94+
95+
public void testBuildResponseInvokesListenerWithResponseWhenTaskIsNotCancelled() throws Exception {
96+
runBasicBuildResponseTest(false);
97+
}
98+
99+
private void runBasicBuildResponseTest(boolean shouldCancelTask) {
100+
final var expectedSnapshot = new Snapshot(ProjectId.DEFAULT, "test-repo", new SnapshotId("snapshot", "uuid"));
101+
final var expectedState = SnapshotsInProgress.State.STARTED;
102+
final var indexName = "test-index-name";
103+
final var indexUuid = "test-index-uuid";
104+
final var currentSnapshotEntries = List.of(
105+
SnapshotsInProgress.Entry.snapshot(
106+
expectedSnapshot,
107+
randomBoolean(),
108+
randomBoolean(),
109+
SnapshotsInProgress.State.STARTED,
110+
Map.of(indexName, new IndexId(indexName, indexUuid)),
111+
List.of(),
112+
List.of(),
113+
randomNonNegativeLong(),
114+
randomNonNegativeLong(),
115+
Map.of(
116+
new ShardId(indexName, indexUuid, 0),
117+
new SnapshotsInProgress.ShardSnapshotStatus("node", new ShardGeneration("gen"))
118+
),
119+
null,
120+
Map.of(),
121+
IndexVersion.current()
122+
)
123+
);
124+
final var nodeSnapshotStatuses = new TransportNodesSnapshotsStatus.NodesSnapshotStatus(
125+
clusterService.getClusterName(),
126+
List.of(),
127+
List.of()
128+
);
129+
130+
// Run some sanity checks for when the task is not cancelled and we get back a response object.
131+
// Note that thorough verification of the SnapshotsStatusResponse is done in the higher level SnapshotStatus API integration tests.
132+
final Consumer<SnapshotsStatusResponse> verifyResponse = rsp -> {
133+
assertNotNull(rsp);
134+
final var snapshotStatuses = rsp.getSnapshots();
135+
assertNotNull(snapshotStatuses);
136+
assertEquals(
137+
"expected 1 snapshot status, got " + snapshotStatuses.size() + ": " + snapshotStatuses,
138+
1,
139+
snapshotStatuses.size()
140+
);
141+
final var snapshotStatus = snapshotStatuses.getFirst();
142+
assertNotNull(snapshotStatus.getSnapshot());
143+
assertEquals(expectedSnapshot, snapshotStatus.getSnapshot());
144+
assertEquals(expectedState, snapshotStatus.getState());
145+
final var snapshotStatusShards = snapshotStatus.getShards();
146+
assertNotNull(snapshotStatusShards);
147+
assertEquals(
148+
"expected 1 index shard status, got " + snapshotStatusShards.size() + ": " + snapshotStatusShards,
149+
1,
150+
snapshotStatusShards.size()
151+
);
152+
final var snapshotStatusIndices = snapshotStatus.getIndices();
153+
assertNotNull(snapshotStatusIndices);
154+
assertEquals(
155+
"expected 1 entry in snapshotStatusIndices, got " + snapshotStatusIndices.size() + ": " + snapshotStatusIndices,
156+
1,
157+
snapshotStatusIndices.size()
158+
);
159+
assertTrue(
160+
"no entry for indexName [" + indexName + "] found in snapshotStatusIndices keyset " + snapshotStatusIndices.keySet(),
161+
snapshotStatusIndices.containsKey(indexName)
162+
);
163+
assertNotNull(snapshotStatus.getShardsStats());
164+
};
165+
166+
final var listener = new ActionListener<SnapshotsStatusResponse>() {
167+
@Override
168+
public void onResponse(SnapshotsStatusResponse rsp) {
169+
if (shouldCancelTask) {
170+
fail("expected detection of task cancellation and onFailure() instead of onResponse(" + rsp + ")");
171+
} else {
172+
verifyResponse.accept(rsp);
173+
}
174+
}
175+
176+
@Override
177+
public void onFailure(Exception e) {
178+
if (shouldCancelTask) {
179+
assertTrue(e instanceof TaskCancelledException);
180+
} else {
181+
fail("expected onResponse() instead of onFailure(" + e + ")");
182+
}
183+
}
184+
};
185+
186+
final var listenerInvoked = new AtomicBoolean(false);
187+
final var cancellableTask = new CancellableTask(randomLong(), "type", "action", "desc", null, Map.of());
188+
189+
if (shouldCancelTask) {
190+
TaskCancelHelper.cancel(cancellableTask, "simulated cancellation");
191+
}
192+
193+
action.buildResponse(
194+
SnapshotsInProgress.EMPTY,
195+
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
196+
currentSnapshotEntries,
197+
nodeSnapshotStatuses,
198+
cancellableTask,
199+
ActionListener.runAfter(listener, () -> listenerInvoked.set(true))
200+
);
201+
assertTrue("Expected listener to be invoked", listenerInvoked.get());
202+
}
203+
}

0 commit comments

Comments
 (0)