Skip to content

Account for time taken to write index buffers in IndexingMemoryController #126786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
020ee1d
commit
ankikuma Apr 14, 2025
4d57998
Merge remote-tracking branch 'upstream/main' into 04142025/IWLES11356
ankikuma Apr 14, 2025
78f521f
Commit
ankikuma Apr 15, 2025
331595c
Add comments
ankikuma Apr 16, 2025
7f29b65
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 17, 2025
d28b2af
commit
ankikuma Apr 17, 2025
7636b32
Add transport version
ankikuma Apr 17, 2025
64216a6
Merge remote-tracking branch 'upstream/main' into 04142025/IWLES11356
ankikuma Apr 17, 2025
6541205
Merge branch '04142025/IWLES11356' of github.com:ankikuma/elasticsear…
ankikuma Apr 18, 2025
0ca6e2d
Update docs/changelog/126786.yaml
ankikuma Apr 18, 2025
2f4f8ad
refresh
ankikuma Apr 22, 2025
018a86c
Merge branch '04142025/IWLES11356' of github.com:ankikuma/elasticsear…
ankikuma Apr 22, 2025
6dc1af4
review comments
ankikuma Apr 25, 2025
6847ead
review comments
ankikuma Apr 25, 2025
f063913
Add test
ankikuma Apr 28, 2025
c24412a
refresh branch
ankikuma Apr 28, 2025
c1c2525
name changes
ankikuma Apr 29, 2025
c96bfa4
refresh
ankikuma Apr 30, 2025
06d5aa5
restore idea files
ankikuma Apr 30, 2025
0286b82
address comments
ankikuma May 1, 2025
85685ba
simplify test
ankikuma May 1, 2025
9061dde
add comments
ankikuma May 1, 2025
1d3ba53
[CI] Auto commit changes from spotless
elasticsearchmachine May 1, 2025
5df4b76
change API name
ankikuma May 1, 2025
e9c944d
change API name
ankikuma May 1, 2025
dddc773
refresh
ankikuma May 1, 2025
519d907
more comments
ankikuma May 1, 2025
9541ee5
pull
ankikuma May 1, 2025
f00918a
pull
ankikuma May 1, 2025
5b81d57
conflict
ankikuma May 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions .idea/eclipseCodeFormatter.xml

This file was deleted.

7 changes: 0 additions & 7 deletions .idea/externalDependencies.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .idea/inspectionProfiles/Project_Default.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .idea/runConfigurations/Debug_Elasticsearch.xml

This file was deleted.

11 changes: 0 additions & 11 deletions .idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

This file was deleted.

11 changes: 0 additions & 11 deletions .idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

This file was deleted.

3 changes: 0 additions & 3 deletions .idea/scopes/Production_minus_fixtures.xml

This file was deleted.

3 changes: 0 additions & 3 deletions .idea/scopes/llrc.xml

This file was deleted.

3 changes: 0 additions & 3 deletions .idea/scopes/x_pack.xml

This file was deleted.

5 changes: 5 additions & 0 deletions docs/changelog/126786.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126786
summary: Account for time taken to write index buffers in `IndexingMemoryController`
area: Distributed
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
CommonStats stats = new CommonStats();
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
stats.store = new StoreStats();
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
stats.indexing = new IndexingStats(
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, targetWriteLoad, 1, 0.123, 0.234)
);
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
Expand All @@ -37,7 +38,9 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
// small indexing buffer so that we can trigger refresh after buffering 100 deletes
// small indexing buffer so that
// 1. We can trigger refresh after buffering 100 deletes
// 2. Indexing memory Controller writes indexing buffers in sync with indexing on the indexing thread
.put("indices.memory.index_buffer_size", "1kb")
.build();
}
Expand Down Expand Up @@ -111,4 +114,21 @@ public void testDeletesAloneCanTriggerRefresh() throws Exception {
}
assertThat(shard.getEngineOrNull().getIndexBufferRAMBytesUsed(), lessThanOrEqualTo(ByteSizeUnit.KB.toBytes(1)));
}

/* When there is memory pressure, we write indexing buffers to disk on the same thread as the indexing thread,
* @see org.elasticsearch.indices.IndexingMemoryController.
* This test verifies that we update the stats that capture the combined time for indexing + writing the
* indexing buffers.
*/
public void testIndexingUpdatesRelevantStats() throws Exception {
IndexService indexService = createIndex("index", indexSettings(1, 0).put("index.refresh_interval", -1).build());
IndexShard shard = indexService.getShard(0);
for (int i = 0; i < 100; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can do this in a single bulk request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we don't even need to index so many documents. The stat goes up in every request.

prepareIndex("index").setId(Integer.toString(i)).setSource("field", "value").get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to set the id explicitly or at least we should add ids randomly, since org.elasticsearch.index.engine.InternalEngine#writeIndexingBuffer takes into account the version map size too (which is populated when explicit ids are used).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Thanks!

}
assertThat(
shard.indexingStats().getTotal().getTotalIndexingExecutionTimeInMillis(),
greaterThan(shard.indexingStats().getTotal().getIndexTime().getMillis())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00);
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_065_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final LongSupplier relativeTimeInNanosSupplier;
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
private volatile long indexingTimeBeforeShardStartedInNanos;
private volatile long indexingTaskExecutionTimeBeforeShardStartedInNanos;
private volatile double recentIndexingLoadAtShardStarted;
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();

Expand Down Expand Up @@ -569,6 +570,7 @@ public void updateShardState(
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
indexingTaskExecutionTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingExecutionTimeInNanos();
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
} else if (currentRouting.primary()
&& currentRouting.relocating()
Expand Down Expand Up @@ -1401,6 +1403,7 @@ public IndexingStats indexingStats() {
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
indexingTaskExecutionTimeBeforeShardStartedInNanos,
timeSinceShardStartedInNanos,
currentTimeInNanos,
recentIndexingLoadAtShardStarted
Expand Down Expand Up @@ -3235,6 +3238,16 @@ public void noopUpdate() {
internalIndexingStats.noopUpdate();
}

/**
* Increment relevant stats when indexing buffers are written to disk using indexing threads,
* in order to apply back-pressure on indexing.
* @param took time it took to write the index buffers for this shard
* @see org.elasticsearch.indices.IndexingMemoryController
*/
public void writeIndexBuffersOnIndexThreads(long took) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename this method to addWriteIndexBuffersOnIndexThreadsTime? its current naming is a bit confusing as it implies that it would do the write. Also, can we include the unit in the parameter method? i.e. tookInNanos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

internalIndexingStats.writeIndexingBuffersTime(took);
}

public void maybeCheckIndex() {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
import static org.elasticsearch.TransportVersions.WRITE_LOAD_INCLUDES_BUFFER_WRITES;

public class IndexingStats implements Writeable, ToXContentFragment {

Expand All @@ -45,6 +46,7 @@ public static class Stats implements Writeable, ToXContentFragment {
private long throttleTimeInMillis;
private boolean isThrottled;
private long totalIndexingTimeSinceShardStartedInNanos;
private long totalIndexingExecutionTimeSinceShardStartedInNanos;
private long totalActiveTimeInNanos;
private double recentIndexingLoad;
private double peakIndexingLoad;
Expand Down Expand Up @@ -87,6 +89,15 @@ public Stats(StreamInput in) throws IOException {
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
: 0;
}
if (in.getTransportVersion().onOrAfter(WRITE_LOAD_INCLUDES_BUFFER_WRITES)) {
totalIndexingExecutionTimeSinceShardStartedInNanos = in.readLong();
} else {
// When getting stats from an older version which doesn't have the more accurate indexing execution time,
// better to fall back to the indexing time, rather that assuming zero load:
totalIndexingExecutionTimeSinceShardStartedInNanos = totalActiveTimeInNanos > 0
? totalIndexingTimeSinceShardStartedInNanos
: 0;
}
}

public Stats(
Expand All @@ -102,6 +113,7 @@ public Stats(
boolean isThrottled,
long throttleTimeInMillis,
long totalIndexingTimeSinceShardStartedInNanos,
long totalIndexingExecutionTimeSinceShardStartedInNanos,
long totalActiveTimeInNanos,
double recentIndexingLoad,
double peakIndexingLoad
Expand All @@ -119,6 +131,7 @@ public Stats(
this.throttleTimeInMillis = throttleTimeInMillis;
// We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats
this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos;
this.totalIndexingExecutionTimeSinceShardStartedInNanos = totalIndexingExecutionTimeSinceShardStartedInNanos;
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
// We store the weighted write load as a double because the calculation is inherently floating point
this.recentIndexingLoad = recentIndexingLoad;
Expand All @@ -141,8 +154,9 @@ public void add(Stats stats) {
if (isThrottled != stats.isThrottled) {
isThrottled = true; // When combining if one is throttled set result to throttled.
}
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
totalIndexingExecutionTimeSinceShardStartedInNanos += stats.totalIndexingExecutionTimeSinceShardStartedInNanos;
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
// so we use the updating formula for a weighted mean:
Expand Down Expand Up @@ -237,7 +251,7 @@ public long getNoopUpdateCount() {
* the elapsed time for each shard.
*/
public double getWriteLoad() {
return totalActiveTimeInNanos > 0 ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
return totalActiveTimeInNanos > 0 ? (double) totalIndexingExecutionTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
}

/**
Expand Down Expand Up @@ -271,6 +285,13 @@ public long getTotalActiveTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
}

/**
* The total amount of time spend on indexing plus writing indexing buffers.
*/
public long getTotalIndexingExecutionTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(totalIndexingExecutionTimeSinceShardStartedInNanos);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -296,6 +317,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
out.writeDouble(peakIndexingLoad);
}
if (out.getTransportVersion().onOrAfter(WRITE_LOAD_INCLUDES_BUFFER_WRITES)) {
out.writeLong(totalIndexingExecutionTimeSinceShardStartedInNanos);
}
}

@Override
Expand Down Expand Up @@ -338,6 +362,7 @@ public boolean equals(Object o) {
&& isThrottled == that.isThrottled
&& throttleTimeInMillis == that.throttleTimeInMillis
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
&& totalIndexingExecutionTimeSinceShardStartedInNanos == that.totalIndexingExecutionTimeSinceShardStartedInNanos
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
&& recentIndexingLoad == that.recentIndexingLoad
&& peakIndexingLoad == that.peakIndexingLoad;
Expand All @@ -358,6 +383,7 @@ public int hashCode() {
isThrottled,
throttleTimeInMillis,
totalIndexingTimeSinceShardStartedInNanos,
totalIndexingExecutionTimeSinceShardStartedInNanos,
totalActiveTimeInNanos
);
}
Expand Down
Loading
Loading