-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Account for time taken to write index buffers in IndexingMemoryController #126786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
020ee1d
4d57998
78f521f
331595c
7f29b65
d28b2af
7636b32
64216a6
6541205
0ca6e2d
2f4f8ad
018a86c
6dc1af4
6847ead
f063913
c24412a
c1c2525
c96bfa4
06d5aa5
0286b82
85685ba
9061dde
1d3ba53
5df4b76
e9c944d
dddc773
519d907
9541ee5
f00918a
5b81d57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 126786 | ||
summary: Account for time taken to write index buffers in `IndexingMemoryController` | ||
area: Distributed | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.util.Optional; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.lessThanOrEqualTo; | ||
|
||
public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { | ||
|
@@ -37,7 +38,9 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { | |
protected Settings nodeSettings() { | ||
return Settings.builder() | ||
.put(super.nodeSettings()) | ||
// small indexing buffer so that we can trigger refresh after buffering 100 deletes | ||
// small indexing buffer so that | ||
// 1. We can trigger refresh after buffering 100 deletes | ||
// 2. Indexing memory Controller writes indexing buffers in sync with indexing on the indexing thread | ||
.put("indices.memory.index_buffer_size", "1kb") | ||
.build(); | ||
} | ||
|
@@ -111,4 +114,21 @@ public void testDeletesAloneCanTriggerRefresh() throws Exception { | |
} | ||
assertThat(shard.getEngineOrNull().getIndexBufferRAMBytesUsed(), lessThanOrEqualTo(ByteSizeUnit.KB.toBytes(1))); | ||
} | ||
|
||
/* When there is memory pressure, we write indexing buffers to disk on the same thread as the indexing thread, | ||
* @see org.elasticsearch.indices.IndexingMemoryController. | ||
* This test verifies that we update the stats that capture the combined time for indexing + writing the | ||
* indexing buffers. | ||
*/ | ||
public void testIndexingUpdatesRelevantStats() throws Exception { | ||
IndexService indexService = createIndex("index", indexSettings(1, 0).put("index.refresh_interval", -1).build()); | ||
IndexShard shard = indexService.getShard(0); | ||
for (int i = 0; i < 100; i++) { | ||
prepareIndex("index").setId(Integer.toString(i)).setSource("field", "value").get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we don't need to set the id explicitly or at least we should add ids randomly, since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see. Thanks!
ankikuma marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
assertThat( | ||
shard.indexingStats().getTotal().getTotalIndexingExecutionTimeInMillis(), | ||
greaterThan(shard.indexingStats().getTotal().getIndexTime().getMillis()) | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -308,6 +308,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl | |
private final LongSupplier relativeTimeInNanosSupplier; | ||
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value | ||
private volatile long indexingTimeBeforeShardStartedInNanos; | ||
private volatile long indexingTaskExecutionTimeBeforeShardStartedInNanos; | ||
private volatile double recentIndexingLoadAtShardStarted; | ||
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>(); | ||
|
||
|
@@ -569,6 +570,7 @@ public void updateShardState( | |
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value. | ||
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L; | ||
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos(); | ||
indexingTaskExecutionTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingExecutionTimeInNanos(); | ||
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos); | ||
} else if (currentRouting.primary() | ||
&& currentRouting.relocating() | ||
|
@@ -1401,6 +1403,7 @@ public IndexingStats indexingStats() { | |
throttled, | ||
throttleTimeInMillis, | ||
indexingTimeBeforeShardStartedInNanos, | ||
indexingTaskExecutionTimeBeforeShardStartedInNanos, | ||
timeSinceShardStartedInNanos, | ||
currentTimeInNanos, | ||
recentIndexingLoadAtShardStarted | ||
|
@@ -3235,6 +3238,16 @@ public void noopUpdate() { | |
internalIndexingStats.noopUpdate(); | ||
} | ||
|
||
/** | ||
* Increment relevant stats when indexing buffers are written to disk using indexing threads, | ||
* in order to apply back-pressure on indexing. | ||
* @param took time it took to write the index buffers for this shard | ||
* @see org.elasticsearch.indices.IndexingMemoryController | ||
*/ | ||
public void writeIndexBuffersOnIndexThreads(long took) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we rename this method to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
internalIndexingStats.writeIndexingBuffersTime(took); | ||
} | ||
|
||
public void maybeCheckIndex() { | ||
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); | ||
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can do this in a single bulk request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we don't even need to index so many documents. The stat goes up in every request.