Skip to content

Add heap usage estimate to ClusterInfo #128723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d60021
Add nodeHeapUsage field to ClusterInfo
nicktindall Jun 2, 2025
542c256
Populate nodesHeapUsage, make HeapUsageSupplier pluggable
nicktindall Jun 2, 2025
356beb5
Fix tests
nicktindall Jun 2, 2025
81fd063
Allow deferred creation of HeapUsageSupplier
nicktindall Jun 3, 2025
bc0682c
Default HeapUsageSupplier
nicktindall Jun 3, 2025
747b5a2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 3, 2025
13d1de8
Clarify that heap usage is a minimum
nicktindall Jun 3, 2025
f4d9db5
Test that InternalClusterInfoService polls for heap usage
nicktindall Jun 3, 2025
bf51e85
Test that getNodesHeapUsage returns heap usage
nicktindall Jun 3, 2025
c47c0ca
More caveats for #getNodesHeapUsage()
nicktindall Jun 3, 2025
23eb8e6
Remove HeapUsageSupplier from ClusterPlugin interface
nicktindall Jun 4, 2025
887bcaf
Swap free for used in HeapUsage
nicktindall Jun 4, 2025
7275acb
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
85fd019
Don't report heap usage in ClusterInfo serialization
nicktindall Jun 4, 2025
f112a3b
Fix tests
nicktindall Jun 4, 2025
3a1ada2
Only skip disk usage fetches when disk usage is disabled
nicktindall Jun 4, 2025
8fa587f
HeapUsage -> ShardHeapUsage
nicktindall Jun 4, 2025
6d4b204
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
2c42a82
icis -> internalClusterInfoService
nicktindall Jun 4, 2025
58402bd
diskUsage -> shardHeapUsage
nicktindall Jun 4, 2025
63bbea8
Note about not serializing shardHeapUsages
nicktindall Jun 4, 2025
0cacdc7
Remove unused serialization interface/methods
nicktindall Jun 4, 2025
dd73d37
Additional assertions
nicktindall Jun 4, 2025
765ade8
Clear shardHeapUsages on failure to fetch
nicktindall Jun 4, 2025
e26b62f
Fix naming
nicktindall Jun 4, 2025
55637b6
Restore + test percentage methods
nicktindall Jun 5, 2025
f4b90b5
Load ShardHeapUsageSupplier via SPI
nicktindall Jun 5, 2025
0789fef
Move SPI config to internalClusterTest
nicktindall Jun 5, 2025
2d475c8
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 5, 2025
f56f00e
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 6, 2025
7b5bf95
*Supplier -> *Collector
nicktindall Jun 7, 2025
08a5ca3
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 7, 2025
09ca9dc
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 8, 2025
cd6b7e9
Don't assert estimate <= max heap
nicktindall Jun 10, 2025
5e2cb9f
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 10, 2025
c529194
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 10, 2025
d831194
Use node stats to retrieve max heap size
nicktindall Jun 11, 2025
b8387bb
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 11, 2025
26dba4d
Fix build
nicktindall Jun 11, 2025
f15fca2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -19,6 +20,8 @@
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.ShardHeapUsage;
import org.elasticsearch.cluster.ShardHeapUsageCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
Expand Down Expand Up @@ -62,6 +65,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock;
Expand All @@ -82,6 +86,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -90,6 +95,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
Expand All @@ -111,12 +117,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class);
return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -254,6 +261,20 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertThat(dataSetSize.get(), greaterThan(0L));
}

public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertNotNull(shardHeapUsages);
assertEquals(state.nodes().size(), shardHeapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(shardHeapUsages.containsKey(node.getId()));
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
}
}

public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
Expand Down Expand Up @@ -797,4 +818,40 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
}
}

public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector {

private final BogusShardHeapUsagePlugin plugin;

public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) {
this.plugin = plugin;
}

@Override
public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) {
ActionListener.completeWith(
listener,
() -> plugin.getClusterService()
.state()
.nodes()
.stream()
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong()))
);
}
}

public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin {

private final SetOnce<ClusterService> clusterService = new SetOnce<>();

@Override
public Collection<?> createComponents(PluginServices services) {
clusterService.set(services.clusterService());
return List.of();
}

public ClusterService getClusterService() {
return clusterService.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".
#

org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageCollector
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00);
public static final TransportVersion ML_INFERENCE_ELASTIC_RERANK = def(9_094_0_00);
public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_095_0_00);
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_096_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
29 changes: 27 additions & 2 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, ShardHeapUsage> shardHeapUsages;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -71,6 +72,7 @@ protected ClusterInfo() {
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param shardHeapUsages shard heap usage broken down by node
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -79,14 +81,16 @@ public ClusterInfo(
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, ShardHeapUsage> shardHeapUsages
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
this.shardSizes = Map.copyOf(shardSizes);
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.shardHeapUsages = Map.copyOf(shardHeapUsages);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -98,6 +102,11 @@ public ClusterInfo(StreamInput in) throws IOException {
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.shardHeapUsages = in.readImmutableMap(ShardHeapUsage::new);
} else {
this.shardHeapUsages = Map.of();
}
}

@Override
Expand All @@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
}
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.shardHeapUsages, StreamOutput::writeWriteable);
}
}

/**
Expand Down Expand Up @@ -192,9 +204,22 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
// NOTE: We don't serialize shardHeapUsages at this stage, to avoid
// committing to API payloads until the feature is settled
);
}

/**
* Returns a node id to estimated heap usage mapping for all nodes that we have such data for.
* Note that these estimates should be considered minimums. They may be used to determine whether
* there IS NOT capacity to do something, but not to determine that there IS capacity to do something.
* Also note that the map may not be complete, it may contain none, or a subset of the nodes in
* the cluster at any time. It may also contain entries for nodes that have since left the cluster.
*/
public Map<String, ShardHeapUsage> getShardHeapUsages() {
return shardHeapUsages;
}

/**
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public ClusterInfo getClusterInfo() {
shardSizes.toImmutableMap(),
shardDataSetSizes,
dataPath,
Map.of(),
Map.of()
);
}
Expand Down
Loading