Skip to content

Commit d2e5c43

Browse files
authored
Export current node allocation stats as APM metrics (#116585)
At the end of each reconciliation round, also export the current allocation stats for each node. This is intended to show the gradual progress (or divergence!) towards the desired values exported in #115854, and relies on the existing `AllocationStatsService`. Relates ES-9873
1 parent 778ab8f commit d2e5c43

File tree

14 files changed

+380
-109
lines changed

14 files changed

+380
-109
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java

+69-13
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() thro
5353
}
5454
}
5555

56-
public void testDesiredBalanceNodeWeightMetrics() {
56+
public void testDesiredBalanceMetrics() {
5757
internalCluster().startNodes(2);
5858
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
5959
indexRandom(randomBoolean(), "test", between(50, 100));
@@ -68,38 +68,83 @@ public void testDesiredBalanceNodeWeightMetrics() {
6868
var nodeIds = internalCluster().clusterService().state().nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
6969
var nodeNames = internalCluster().clusterService().state().nodes().stream().map(DiscoveryNode::getName).collect(Collectors.toSet());
7070

71-
final var nodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
71+
final var desiredBalanceNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
7272
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WEIGHT_METRIC_NAME
7373
);
74-
assertThat(nodeWeightsMetrics.size(), equalTo(2));
75-
for (var nodeStat : nodeWeightsMetrics) {
74+
assertThat(desiredBalanceNodeWeightsMetrics.size(), equalTo(2));
75+
for (var nodeStat : desiredBalanceNodeWeightsMetrics) {
7676
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
7777
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
7878
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
7979
}
80-
final var nodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
80+
final var desiredBalanceNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
8181
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME
8282
);
83-
assertThat(nodeShardCountMetrics.size(), equalTo(2));
84-
for (var nodeStat : nodeShardCountMetrics) {
83+
assertThat(desiredBalanceNodeShardCountMetrics.size(), equalTo(2));
84+
for (var nodeStat : desiredBalanceNodeShardCountMetrics) {
8585
assertThat(nodeStat.value().longValue(), equalTo(2L));
8686
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
8787
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
8888
}
89-
final var nodeWriteLoadMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
89+
final var desiredBalanceNodeWriteLoadMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
9090
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WRITE_LOAD_METRIC_NAME
9191
);
92-
assertThat(nodeWriteLoadMetrics.size(), equalTo(2));
93-
for (var nodeStat : nodeWriteLoadMetrics) {
92+
assertThat(desiredBalanceNodeWriteLoadMetrics.size(), equalTo(2));
93+
for (var nodeStat : desiredBalanceNodeWriteLoadMetrics) {
9494
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
9595
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
9696
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
9797
}
98-
final var nodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
98+
final var desiredBalanceNodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
9999
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME
100100
);
101-
assertThat(nodeDiskUsageMetrics.size(), equalTo(2));
102-
for (var nodeStat : nodeDiskUsageMetrics) {
101+
assertThat(desiredBalanceNodeDiskUsageMetrics.size(), equalTo(2));
102+
for (var nodeStat : desiredBalanceNodeDiskUsageMetrics) {
103+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
104+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
105+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
106+
}
107+
final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
108+
DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME
109+
);
110+
assertThat(currentNodeShardCountMetrics.size(), equalTo(2));
111+
for (var nodeStat : currentNodeShardCountMetrics) {
112+
assertThat(nodeStat.value().longValue(), equalTo(2L));
113+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
114+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
115+
}
116+
final var currentNodeWriteLoadMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
117+
DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME
118+
);
119+
assertThat(currentNodeWriteLoadMetrics.size(), equalTo(2));
120+
for (var nodeStat : currentNodeWriteLoadMetrics) {
121+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
122+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
123+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
124+
}
125+
final var currentNodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
126+
DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME
127+
);
128+
assertThat(currentNodeDiskUsageMetrics.size(), equalTo(2));
129+
for (var nodeStat : currentNodeDiskUsageMetrics) {
130+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
131+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
132+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
133+
}
134+
final var currentNodeUndesiredShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
135+
DesiredBalanceMetrics.CURRENT_NODE_UNDESIRED_SHARD_COUNT_METRIC_NAME
136+
);
137+
assertThat(currentNodeUndesiredShardCountMetrics.size(), equalTo(2));
138+
for (var nodeStat : currentNodeUndesiredShardCountMetrics) {
139+
assertThat(nodeStat.value().longValue(), greaterThanOrEqualTo(0L));
140+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
141+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
142+
}
143+
final var currentNodeForecastedDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
144+
DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME
145+
);
146+
assertThat(currentNodeForecastedDiskUsageMetrics.size(), equalTo(2));
147+
for (var nodeStat : currentNodeForecastedDiskUsageMetrics) {
103148
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
104149
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
105150
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
@@ -136,6 +181,17 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
136181
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME),
137182
matcher
138183
);
184+
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
185+
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
186+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);
187+
assertThat(
188+
testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME),
189+
matcher
190+
);
191+
assertThat(
192+
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_UNDESIRED_SHARD_COUNT_METRIC_NAME),
193+
matcher
194+
);
139195
}
140196

141197
private static TestTelemetryPlugin getTelemetryPlugin(String nodeName) {

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy;
3434
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
3535
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
36+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider;
3637
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
3738
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3839
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
@@ -138,6 +139,7 @@ public ClusterModule(
138139
this.clusterPlugins = clusterPlugins;
139140
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
140141
this.allocationDeciders = new AllocationDeciders(deciderList);
142+
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster);
141143
this.shardsAllocator = createShardsAllocator(
142144
settings,
143145
clusterService.getClusterSettings(),
@@ -146,7 +148,8 @@ public ClusterModule(
146148
clusterService,
147149
this::reconcile,
148150
writeLoadForecaster,
149-
telemetryProvider
151+
telemetryProvider,
152+
nodeAllocationStatsProvider
150153
);
151154
this.clusterService = clusterService;
152155
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices);
@@ -160,7 +163,12 @@ public ClusterModule(
160163
);
161164
this.allocationService.addAllocFailuresResetListenerTo(clusterService);
162165
this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
163-
this.allocationStatsService = new AllocationStatsService(clusterService, clusterInfoService, shardsAllocator, writeLoadForecaster);
166+
this.allocationStatsService = new AllocationStatsService(
167+
clusterService,
168+
clusterInfoService,
169+
shardsAllocator,
170+
nodeAllocationStatsProvider
171+
);
164172
this.telemetryProvider = telemetryProvider;
165173
}
166174

@@ -400,7 +408,8 @@ private static ShardsAllocator createShardsAllocator(
400408
ClusterService clusterService,
401409
DesiredBalanceReconcilerAction reconciler,
402410
WriteLoadForecaster writeLoadForecaster,
403-
TelemetryProvider telemetryProvider
411+
TelemetryProvider telemetryProvider,
412+
NodeAllocationStatsProvider nodeAllocationStatsProvider
404413
) {
405414
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
406415
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(clusterSettings, writeLoadForecaster));
@@ -412,7 +421,8 @@ private static ShardsAllocator createShardsAllocator(
412421
threadPool,
413422
clusterService,
414423
reconciler,
415-
telemetryProvider
424+
telemetryProvider,
425+
nodeAllocationStatsProvider
416426
)
417427
);
418428

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

+9-60
Original file line numberDiff line numberDiff line change
@@ -10,86 +10,35 @@
1010
package org.elasticsearch.cluster.routing.allocation;
1111

1212
import org.elasticsearch.cluster.ClusterInfoService;
13-
import org.elasticsearch.cluster.metadata.IndexMetadata;
14-
import org.elasticsearch.cluster.routing.RoutingNode;
15-
import org.elasticsearch.cluster.routing.ShardRouting;
1613
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
1714
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
1815
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
1916
import org.elasticsearch.cluster.service.ClusterService;
20-
import org.elasticsearch.common.util.Maps;
2117

2218
import java.util.Map;
19+
import java.util.function.Supplier;
2320

2421
public class AllocationStatsService {
25-
2622
private final ClusterService clusterService;
2723
private final ClusterInfoService clusterInfoService;
28-
private final DesiredBalanceShardsAllocator desiredBalanceShardsAllocator;
29-
private final WriteLoadForecaster writeLoadForecaster;
24+
private final Supplier<DesiredBalance> desiredBalanceSupplier;
25+
private final NodeAllocationStatsProvider nodeAllocationStatsProvider;
3026

3127
public AllocationStatsService(
3228
ClusterService clusterService,
3329
ClusterInfoService clusterInfoService,
3430
ShardsAllocator shardsAllocator,
35-
WriteLoadForecaster writeLoadForecaster
31+
NodeAllocationStatsProvider nodeAllocationStatsProvider
3632
) {
3733
this.clusterService = clusterService;
3834
this.clusterInfoService = clusterInfoService;
39-
this.desiredBalanceShardsAllocator = shardsAllocator instanceof DesiredBalanceShardsAllocator allocator ? allocator : null;
40-
this.writeLoadForecaster = writeLoadForecaster;
35+
this.nodeAllocationStatsProvider = nodeAllocationStatsProvider;
36+
this.desiredBalanceSupplier = shardsAllocator instanceof DesiredBalanceShardsAllocator allocator
37+
? allocator::getDesiredBalance
38+
: () -> null;
4139
}
4240

4341
public Map<String, NodeAllocationStats> stats() {
44-
var state = clusterService.state();
45-
var info = clusterInfoService.getClusterInfo();
46-
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;
47-
48-
var stats = Maps.<String, NodeAllocationStats>newMapWithExpectedSize(state.getRoutingNodes().size());
49-
for (RoutingNode node : state.getRoutingNodes()) {
50-
int shards = 0;
51-
int undesiredShards = 0;
52-
double forecastedWriteLoad = 0.0;
53-
long forecastedDiskUsage = 0;
54-
long currentDiskUsage = 0;
55-
for (ShardRouting shardRouting : node) {
56-
if (shardRouting.relocating()) {
57-
continue;
58-
}
59-
shards++;
60-
IndexMetadata indexMetadata = state.metadata().getIndexSafe(shardRouting.index());
61-
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
62-
undesiredShards++;
63-
}
64-
long shardSize = info.getShardSize(shardRouting.shardId(), shardRouting.primary(), 0);
65-
forecastedWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
66-
forecastedDiskUsage += Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0), shardSize);
67-
currentDiskUsage += shardSize;
68-
69-
}
70-
stats.put(
71-
node.nodeId(),
72-
new NodeAllocationStats(
73-
shards,
74-
desiredBalanceShardsAllocator != null ? undesiredShards : -1,
75-
forecastedWriteLoad,
76-
forecastedDiskUsage,
77-
currentDiskUsage
78-
)
79-
);
80-
}
81-
82-
return stats;
83-
}
84-
85-
private static boolean isDesiredAllocation(DesiredBalance desiredBalance, ShardRouting shardRouting) {
86-
if (desiredBalance == null) {
87-
return true;
88-
}
89-
var assignment = desiredBalance.getAssignment(shardRouting.shardId());
90-
if (assignment == null) {
91-
return false;
92-
}
93-
return assignment.nodeIds().contains(shardRouting.currentNodeId());
42+
return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get());
9443
}
9544
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation;
11+
12+
import org.elasticsearch.cluster.ClusterInfo;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.routing.RoutingNode;
16+
import org.elasticsearch.cluster.routing.ShardRouting;
17+
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
18+
import org.elasticsearch.common.util.Maps;
19+
import org.elasticsearch.core.Nullable;
20+
21+
import java.util.Map;
22+
23+
public class NodeAllocationStatsProvider {
24+
private final WriteLoadForecaster writeLoadForecaster;
25+
26+
public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) {
27+
this.writeLoadForecaster = writeLoadForecaster;
28+
}
29+
30+
public Map<String, NodeAllocationStats> stats(
31+
ClusterState clusterState,
32+
ClusterInfo clusterInfo,
33+
@Nullable DesiredBalance desiredBalance
34+
) {
35+
var stats = Maps.<String, NodeAllocationStats>newMapWithExpectedSize(clusterState.getRoutingNodes().size());
36+
for (RoutingNode node : clusterState.getRoutingNodes()) {
37+
int shards = 0;
38+
int undesiredShards = 0;
39+
double forecastedWriteLoad = 0.0;
40+
long forecastedDiskUsage = 0;
41+
long currentDiskUsage = 0;
42+
for (ShardRouting shardRouting : node) {
43+
if (shardRouting.relocating()) {
44+
continue;
45+
}
46+
shards++;
47+
IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index());
48+
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
49+
undesiredShards++;
50+
}
51+
long shardSize = clusterInfo.getShardSize(shardRouting.shardId(), shardRouting.primary(), 0);
52+
forecastedWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
53+
forecastedDiskUsage += Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0), shardSize);
54+
currentDiskUsage += shardSize;
55+
56+
}
57+
stats.put(
58+
node.nodeId(),
59+
new NodeAllocationStats(
60+
shards,
61+
desiredBalance != null ? undesiredShards : -1,
62+
forecastedWriteLoad,
63+
forecastedDiskUsage,
64+
currentDiskUsage
65+
)
66+
);
67+
}
68+
69+
return stats;
70+
}
71+
72+
private static boolean isDesiredAllocation(DesiredBalance desiredBalance, ShardRouting shardRouting) {
73+
if (desiredBalance == null) {
74+
return true;
75+
}
76+
var assignment = desiredBalance.getAssignment(shardRouting.shardId());
77+
if (assignment == null) {
78+
return false;
79+
}
80+
return assignment.nodeIds().contains(shardRouting.currentNodeId());
81+
}
82+
}

0 commit comments

Comments
 (0)