diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java index ad92861f472c4..359d3ef8469c7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -73,6 +73,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -568,50 +569,61 @@ public void testNonThrottleStats() throws Exception { public void testThrottleStats() throws Exception { assertAcked( - prepareCreate("test").setSettings( + prepareCreate("test_throttle_stats_index").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1") + .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true") .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()) ) ); - ensureGreen(); - long termUpto = 0; - IndicesStatsResponse stats; + ensureGreen("test_throttle_stats_index"); // make sure we see throttling kicking in: - boolean done = false; - long start = System.currentTimeMillis(); - while (done == false) { - for (int i = 0; i < 100; i++) { - // Provoke slowish merging by making many unique terms: - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < 100; j++) { - sb.append(' '); - sb.append(termUpto++); - } - client().prepareIndex("test", "type", "" + termUpto).setSource("field" + (i % 10), sb.toString()).get(); - if (i % 2 == 0) { - refresh(); + AtomicBoolean done = new AtomicBoolean(); + AtomicLong termUpTo = new AtomicLong(); + Thread[] indexingThreads = new Thread[5]; + for (int threadIdx = 0; threadIdx < indexingThreads.length; threadIdx++) { + indexingThreads[threadIdx] = new Thread(() -> { + while (done.get() == false) { + for (int i = 0; i < 100; i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < 100; j++) { + sb.append(' '); + sb.append(termUpTo.incrementAndGet()); + } + client().prepareIndex("test_throttle_stats_index", "type", "" + termUpTo.get()) + .setSource("field" + (i % 10), sb.toString()) + .get(); + if (i % 2 == 0) { + refresh("test_throttle_stats_index"); + } + } + refresh("test_throttle_stats_index"); } - } - refresh(); - stats = client().admin().indices().prepareStats().execute().actionGet(); - // nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); - done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0; - if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in - fail("index throttling didn't kick in after 5 minutes of intense merging"); - } + }); + indexingThreads[threadIdx].start(); + } + + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats("test_throttle_stats_index").get(); + assertTrue(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0); + done.set(true); + }, 5L, TimeUnit.MINUTES); + + for (Thread indexingThread : indexingThreads) { + indexingThread.join(); } // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked" // when ESIntegTestCase.after tries to remove indices created by the test: - logger.info("test: now optimize"); - client().admin().indices().prepareForceMerge("test").get(); - flush(); - logger.info("test: test done"); + logger.info("test throttle stats: now optimize"); + client().admin().indices().prepareForceMerge("test_throttle_stats_index").get(); + flush("test_throttle_stats_index"); + logger.info("test throttle stats: test done"); } public void testSimpleStats() throws Exception {