13
13
import org .elasticsearch .action .admin .cluster .snapshots .create .CreateSnapshotResponse ;
14
14
import org .elasticsearch .action .admin .cluster .snapshots .restore .RestoreSnapshotResponse ;
15
15
import org .elasticsearch .action .admin .indices .stats .ShardStats ;
16
+ import org .elasticsearch .action .support .ActionTestUtils ;
16
17
import org .elasticsearch .cluster .ClusterInfoService ;
17
18
import org .elasticsearch .cluster .ClusterInfoServiceUtils ;
18
19
import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
34
35
import org .elasticsearch .snapshots .RestoreInfo ;
35
36
import org .elasticsearch .snapshots .SnapshotInfo ;
36
37
import org .elasticsearch .snapshots .SnapshotState ;
38
+ import org .elasticsearch .test .ClusterServiceUtils ;
37
39
import org .elasticsearch .test .ESIntegTestCase ;
38
- import org .elasticsearch .test .junit .annotations .TestIssueLogging ;
39
40
import org .hamcrest .Matcher ;
40
41
41
42
import java .util .Arrays ;
42
43
import java .util .Comparator ;
43
44
import java .util .HashSet ;
44
45
import java .util .List ;
45
46
import java .util .Set ;
47
+ import java .util .concurrent .CountDownLatch ;
46
48
import java .util .concurrent .TimeUnit ;
47
49
import java .util .concurrent .atomic .AtomicBoolean ;
48
50
54
56
import static org .hamcrest .Matchers .contains ;
55
57
import static org .hamcrest .Matchers .empty ;
56
58
import static org .hamcrest .Matchers .equalTo ;
59
+ import static org .hamcrest .Matchers .hasSize ;
57
60
import static org .hamcrest .Matchers .in ;
58
61
import static org .hamcrest .Matchers .is ;
62
+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
59
63
60
64
@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
61
65
public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
@@ -163,20 +167,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
163
167
assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getSmallestShardIds ())));
164
168
}
165
169
166
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/105331" )
167
- @ TestIssueLogging (
168
- value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
169
- + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
170
- + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
171
- + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
172
- + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
173
- + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE" ,
174
- issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
175
- )
176
- public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards () throws Exception {
170
+ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores () throws Exception {
177
171
internalCluster ().startMasterOnlyNode ();
178
- internalCluster ().startDataOnlyNode ();
179
172
final String dataNodeName = internalCluster ().startDataOnlyNode ();
173
+ internalCluster ().startDataOnlyNode ();
180
174
ensureStableCluster (3 );
181
175
182
176
assertAcked (
@@ -185,26 +179,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
185
179
.setSettings (Settings .builder ().put ("location" , randomRepoPath ()).put ("compress" , randomBoolean ()))
186
180
);
187
181
188
- final AtomicBoolean allowRelocations = new AtomicBoolean (true );
189
182
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService ();
190
- internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> {
191
- ClusterInfoServiceUtils .refresh (clusterInfoService );
192
- if (allowRelocations .get () == false ) {
193
- assertThat (
194
- "Expects no relocating shards but got: " + event .state ().getRoutingNodes (),
195
- numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
196
- equalTo (0 )
197
- );
198
- }
199
- });
200
-
201
- final String dataNode0Id = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
183
+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
184
+ .addListener (event -> ClusterInfoServiceUtils .refresh (clusterInfoService ));
202
185
203
186
final String indexName = randomIdentifier ();
204
187
createIndex (indexName , indexSettings (6 , 0 ).put (INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING .getKey (), "0ms" ).build ());
205
- var shardSizes = createReasonableSizedShards (indexName );
188
+ final var shardSizes = createReasonableSizedShards (indexName );
206
189
207
190
final CreateSnapshotResponse createSnapshotResponse = clusterAdmin ().prepareCreateSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
191
+ .setIndices (indexName )
208
192
.setWaitForCompletion (true )
209
193
.get ();
210
194
final SnapshotInfo snapshotInfo = createSnapshotResponse .getSnapshotInfo ();
@@ -213,21 +197,82 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
213
197
214
198
assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
215
199
updateClusterSettings (Settings .builder ().put (CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), Rebalance .NONE .toString ()));
216
- allowRelocations .set (false );
217
200
218
- // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated
219
- var usableSpace = shardSizes .sizes ().get (1 ).size ();
201
+ // Verify that from this point on we do not do any rebalancing
202
+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).addListener (event -> {
203
+ assertThat (
204
+ "Expects no relocating shards but got: " + event .state ().getRoutingNodes (),
205
+ numberOfShardsWithState (event .state ().getRoutingNodes (), ShardRoutingState .RELOCATING ),
206
+ equalTo (0 )
207
+ );
208
+ });
209
+
210
+ // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
211
+ // other data node
212
+ final var usableSpace = randomLongBetween (shardSizes .getSmallestShardSize (), shardSizes .getSmallestShardSize () * 2 - 1L );
220
213
getTestFileStore (dataNodeName ).setTotalSpace (usableSpace + WATERMARK_BYTES );
221
214
refreshDiskUsage ();
222
215
216
+ // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
217
+ // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
218
+ // trigger the whole chain by starting the first restore.
219
+ final var copyIndexName = indexName + "-copy" ;
220
+
221
+ // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
222
+ final var dataNodeId = internalCluster ().getInstance (NodeEnvironment .class , dataNodeName ).nodeId ();
223
+ final var allShardsActiveListener = ClusterServiceUtils .addTemporaryStateListener (cs -> {
224
+ assertThat (cs .getRoutingNodes ().toString (), cs .getRoutingNodes ().node (dataNodeId ).size (), lessThanOrEqualTo (1 ));
225
+ var seenCopy = false ;
226
+ for (final IndexRoutingTable indexRoutingTable : cs .routingTable ()) {
227
+ if (indexRoutingTable .getIndex ().getName ().equals (copyIndexName )) {
228
+ seenCopy = true ;
229
+ }
230
+ if (indexRoutingTable .allShardsActive () == false ) {
231
+ return false ;
232
+ }
233
+ }
234
+ return seenCopy ; // only remove this listener when we've started both restores and all the resulting shards are complete
235
+ });
236
+
237
+ // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
238
+ final var secondRestoreCompleteLatch = new CountDownLatch (1 );
239
+ final var secondRestoreStartedListener = ClusterServiceUtils .addTemporaryStateListener (cs -> {
240
+ final var indexRoutingTable = cs .routingTable ().index (indexName );
241
+ if (indexRoutingTable != null && indexRoutingTable .shardsWithState (ShardRoutingState .INITIALIZING ).isEmpty () == false ) {
242
+ clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
243
+ .setWaitForCompletion (true )
244
+ .setRenamePattern (indexName )
245
+ .setRenameReplacement (indexName + "-copy" )
246
+ .execute (ActionTestUtils .assertNoFailureListener (restoreSnapshotResponse -> {
247
+ final RestoreInfo restoreInfo = restoreSnapshotResponse .getRestoreInfo ();
248
+ assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
249
+ assertThat (restoreInfo .failedShards (), is (0 ));
250
+ secondRestoreCompleteLatch .countDown ();
251
+ }));
252
+ return true ;
253
+ }
254
+ return false ;
255
+ });
256
+
257
+ // now set the ball rolling by doing the first restore, waiting for it to complete
223
258
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin ().prepareRestoreSnapshot (TEST_REQUEST_TIMEOUT , "repo" , "snap" )
224
259
.setWaitForCompletion (true )
225
260
.get ();
226
261
final RestoreInfo restoreInfo = restoreSnapshotResponse .getRestoreInfo ();
227
262
assertThat (restoreInfo .successfulShards (), is (snapshotInfo .totalShards ()));
228
263
assertThat (restoreInfo .failedShards (), is (0 ));
229
264
230
- assertBusyWithDiskUsageRefresh (dataNode0Id , indexName , contains (in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace ))));
265
+ // wait for the second restore to complete too
266
+ safeAwait (secondRestoreStartedListener );
267
+ safeAwait (secondRestoreCompleteLatch );
268
+
269
+ // wait for all the shards to finish moving
270
+ safeAwait (allShardsActiveListener );
271
+ ensureGreen (indexName , indexName + "-copy" );
272
+
273
+ final var tinyNodeShardIds = getShardIds (dataNodeId , indexName );
274
+ assertThat (tinyNodeShardIds , hasSize (1 ));
275
+ assertThat (tinyNodeShardIds .iterator ().next (), in (shardSizes .getShardIdsWithSizeSmallerOrEqual (usableSpace )));
231
276
}
232
277
233
278
private Set <ShardId > getShardIds (final String nodeId , final String indexName ) {
0 commit comments