|
73 | 73 | import java.util.concurrent.TimeUnit;
|
74 | 74 | import java.util.concurrent.atomic.AtomicBoolean;
|
75 | 75 | import java.util.concurrent.atomic.AtomicInteger;
|
| 76 | +import java.util.concurrent.atomic.AtomicLong; |
76 | 77 | import java.util.concurrent.atomic.AtomicReference;
|
77 | 78 |
|
78 | 79 | import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
@@ -568,50 +569,61 @@ public void testNonThrottleStats() throws Exception {
|
568 | 569 |
|
569 | 570 | public void testThrottleStats() throws Exception {
|
570 | 571 | assertAcked(
|
571 |
| - prepareCreate("test").setSettings( |
| 572 | + prepareCreate("test_throttle_stats_index").setSettings( |
572 | 573 | settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
|
573 | 574 | .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
|
574 | 575 | .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
|
575 | 576 | .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
|
576 | 577 | .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
|
577 | 578 | .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1")
|
| 579 | + .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true") |
578 | 580 | .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name())
|
579 | 581 | )
|
580 | 582 | );
|
581 |
| - ensureGreen(); |
582 |
| - long termUpto = 0; |
583 |
| - IndicesStatsResponse stats; |
| 583 | + ensureGreen("test_throttle_stats_index"); |
584 | 584 | // make sure we see throttling kicking in:
|
585 |
| - boolean done = false; |
586 |
| - long start = System.currentTimeMillis(); |
587 |
| - while (done == false) { |
588 |
| - for (int i = 0; i < 100; i++) { |
589 |
| - // Provoke slowish merging by making many unique terms: |
590 |
| - StringBuilder sb = new StringBuilder(); |
591 |
| - for (int j = 0; j < 100; j++) { |
592 |
| - sb.append(' '); |
593 |
| - sb.append(termUpto++); |
594 |
| - } |
595 |
| - client().prepareIndex("test", "type", "" + termUpto).setSource("field" + (i % 10), sb.toString()).get(); |
596 |
| - if (i % 2 == 0) { |
597 |
| - refresh(); |
| 585 | + AtomicBoolean done = new AtomicBoolean(); |
| 586 | + AtomicLong termUpTo = new AtomicLong(); |
| 587 | + Thread[] indexingThreads = new Thread[5]; |
| 588 | + for (int threadIdx = 0; threadIdx < indexingThreads.length; threadIdx++) { |
| 589 | + indexingThreads[threadIdx] = new Thread(() -> { |
| 590 | + while (done.get() == false) { |
| 591 | + for (int i = 0; i < 100; i++) { |
| 592 | + // Provoke slowish merging by making many unique terms: |
| 593 | + StringBuilder sb = new StringBuilder(); |
| 594 | + for (int j = 0; j < 100; j++) { |
| 595 | + sb.append(' '); |
| 596 | + sb.append(termUpTo.incrementAndGet()); |
| 597 | + } |
| 598 | + client().prepareIndex("test_throttle_stats_index", "type", "" + termUpTo.get()) |
| 599 | + .setSource("field" + (i % 10), sb.toString()) |
| 600 | + .get(); |
| 601 | + if (i % 2 == 0) { |
| 602 | + refresh("test_throttle_stats_index"); |
| 603 | + } |
| 604 | + } |
| 605 | + refresh("test_throttle_stats_index"); |
598 | 606 | }
|
599 |
| - } |
600 |
| - refresh(); |
601 |
| - stats = client().admin().indices().prepareStats().execute().actionGet(); |
602 |
| - // nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); |
603 |
| - done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0; |
604 |
| - if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in |
605 |
| - fail("index throttling didn't kick in after 5 minutes of intense merging"); |
606 |
| - } |
| 607 | + }); |
| 608 | + indexingThreads[threadIdx].start(); |
| 609 | + } |
| 610 | + |
| 611 | + assertBusy(() -> { |
| 612 | + IndicesStatsResponse stats = client().admin().indices().prepareStats("test_throttle_stats_index").get(); |
| 613 | + assertTrue(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0); |
| 614 | + done.set(true); |
| 615 | + }, 5L, TimeUnit.MINUTES); |
| 616 | + |
| 617 | + for (Thread indexingThread : indexingThreads) { |
| 618 | + indexingThread.join(); |
607 | 619 | }
|
608 | 620 |
|
609 | 621 | // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
|
610 | 622 | // when ESIntegTestCase.after tries to remove indices created by the test:
|
611 |
| - logger.info("test: now optimize"); |
612 |
| - client().admin().indices().prepareForceMerge("test").get(); |
613 |
| - flush(); |
614 |
| - logger.info("test: test done"); |
| 623 | + logger.info("test throttle stats: now optimize"); |
| 624 | + client().admin().indices().prepareForceMerge("test_throttle_stats_index").get(); |
| 625 | + flush("test_throttle_stats_index"); |
| 626 | + logger.info("test throttle stats: test done"); |
615 | 627 | }
|
616 | 628 |
|
617 | 629 | public void testSimpleStats() throws Exception {
|
|
0 commit comments