Skip to content

Propagate file settings health info to the health node #127397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b0d718e
Initial testHealthIndicator that fails
prdoyle Apr 23, 2025
32a7f7d
Refactor: FileSettingsHealthInfo record
prdoyle Apr 23, 2025
458a9b3
Propagate file settings health indicator to health node
prdoyle Apr 25, 2025
9339c3a
ensureStableCluster
prdoyle Apr 25, 2025
936813b
Try to induce a failure from returning node-local info
prdoyle Apr 28, 2025
4bee8d8
Remove redundant node from client() call
prdoyle Apr 29, 2025
e8c2a42
Use local node ID in UpdateHealthInfoCacheAction.Request
prdoyle Apr 29, 2025
411489f
Move logger to top
prdoyle Apr 29, 2025
434933d
Test node-local health on master and health nodes
prdoyle Apr 29, 2025
209902d
Fix calculate to use the given info
prdoyle Apr 29, 2025
e2c05f7
mutateFileSettingsHealthInfo
prdoyle Apr 29, 2025
bd5fed8
Test status from local current info
prdoyle Apr 29, 2025
4f6102b
FileSettingsHealthTracker
prdoyle Apr 30, 2025
12d7244
Spruce up HealthInfoTests
prdoyle Apr 30, 2025
095f377
spotless
prdoyle Apr 30, 2025
484d85f
randomNonNegativeLong
prdoyle Apr 30, 2025
203aad0
Merge remote-tracking branch 'upstream/main' into health
prdoyle Apr 30, 2025
a91b807
Rename variable
prdoyle Apr 30, 2025
696235b
Address Niels' comments
prdoyle Apr 30, 2025
6598e1f
Test one- and two-node clusters
prdoyle Apr 30, 2025
23fc675
Merge remote-tracking branch 'origin/health' into health
prdoyle Apr 30, 2025
07b2d8b
[CI] Auto commit changes from spotless
elasticsearchmachine Apr 30, 2025
ce015e7
Ensure there's a master node
prdoyle Apr 30, 2025
4e0978e
Merge branch 'main' into health
prdoyle Apr 30, 2025
debbf90
setBootstrapMasterNodeIndex
prdoyle Apr 30, 2025
98b7506
Merge branch 'main' into health
prdoyle May 5, 2025
e2cfbd7
Merge branch 'main' into health
prdoyle May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Propagate file settings health indicator to health node
  • Loading branch information
prdoyle committed Apr 29, 2025
commit 458a9b34ac52103f34b82d8ee91c54e3dea1173f
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.health.node.ProjectIndexName;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -167,6 +168,6 @@ public void testMultiProject() {
}

private HealthInfo constructHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
return new HealthInfo(Map.of(), dslHealthInfo, Map.of());
return new HealthInfo(Map.of(), dslHealthInfo, Map.of(), FileSettingsHealthInfo.INDETERMINATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -144,7 +145,16 @@ public void clusterChanged(ClusterChangedEvent event) {
states.add(
new RoutingNodesAndHealth(
event.state().getRoutingNodes(),
service.calculate(false, 1, new HealthInfo(Map.of(), DataStreamLifecycleHealthInfo.NO_DSL_ERRORS, Map.of()))
service.calculate(
false,
1,
new HealthInfo(
Map.of(),
DataStreamLifecycleHealthInfo.NO_DSL_ERRORS,
Map.of(),
FileSettingsHealthInfo.INDETERMINATE
)
)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00);
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_065_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
26 changes: 21 additions & 5 deletions server/src/main/java/org/elasticsearch/health/node/HealthInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,46 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;

import java.io.IOException;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static org.elasticsearch.health.node.DataStreamLifecycleHealthInfo.NO_DSL_ERRORS;
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;

/**
* This class wraps all the data returned by the health node.
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
* @param dslHealthInfo The data stream lifecycle health information
*
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
* @param dslHealthInfo The data stream lifecycle health information
* @param repositoriesInfoByNode A Map of node id to RepositoriesHealthInfo for that node
* @param fileSettingsHealthInfo The file-based settings health information
*/
public record HealthInfo(
Map<String, DiskHealthInfo> diskInfoByNode,
@Nullable DataStreamLifecycleHealthInfo dslHealthInfo,
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode,
FileSettingsHealthInfo fileSettingsHealthInfo
) implements Writeable {

public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of());
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of(), INDETERMINATE);

public HealthInfo {
requireNonNull(fileSettingsHealthInfo);
}

public HealthInfo(StreamInput input) throws IOException {
this(
input.readMap(DiskHealthInfo::new),
input.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
? input.readOptionalWriteable(DataStreamLifecycleHealthInfo::new)
: null,
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of()
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of(),
input.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
? input.readOptionalWriteable(FileSettingsHealthInfo::new)
: INDETERMINATE
);
}

Expand All @@ -53,5 +66,8 @@ public void writeTo(StreamOutput output) throws IOException {
if (output.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
output.writeMap(repositoriesInfoByNode, StreamOutput::writeWriteable);
}
if (output.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
output.writeOptionalWriteable(fileSettingsHealthInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.reservedstate.service.FileSettingsService;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;

/**
* Keeps track of several health statuses per node that can be used in health.
*/
Expand All @@ -31,6 +34,7 @@ public class HealthInfoCache implements ClusterStateListener {
@Nullable
private volatile DataStreamLifecycleHealthInfo dslHealthInfo = null;
private volatile ConcurrentHashMap<String, RepositoriesHealthInfo> repositoriesInfoByNode = new ConcurrentHashMap<>();
private volatile FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo = INDETERMINATE;

private HealthInfoCache() {}

Expand All @@ -44,7 +48,8 @@ public void updateNodeHealth(
String nodeId,
@Nullable DiskHealthInfo diskHealthInfo,
@Nullable DataStreamLifecycleHealthInfo latestDslHealthInfo,
@Nullable RepositoriesHealthInfo repositoriesHealthInfo
@Nullable RepositoriesHealthInfo repositoriesHealthInfo,
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
) {
if (diskHealthInfo != null) {
diskInfoByNode.put(nodeId, diskHealthInfo);
Expand All @@ -55,6 +60,9 @@ public void updateNodeHealth(
if (repositoriesHealthInfo != null) {
repositoriesInfoByNode.put(nodeId, repositoriesHealthInfo);
}
if (fileSettingsHealthInfo != null) {
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
}
}

@Override
Expand All @@ -77,6 +85,7 @@ public void clusterChanged(ClusterChangedEvent event) {
diskInfoByNode = new ConcurrentHashMap<>();
dslHealthInfo = null;
repositoriesInfoByNode = new ConcurrentHashMap<>();
fileSettingsHealthInfo = INDETERMINATE;
}
}

Expand All @@ -86,6 +95,6 @@ public void clusterChanged(ClusterChangedEvent event) {
*/
public HealthInfo getHealthInfo() {
// A shallow copy is enough because the inner data is immutable.
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode));
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode), fileSettingsHealthInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.elasticsearch.health.node.action.HealthNodeRequest;
import org.elasticsearch.health.node.action.TransportHealthNodeAction;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.reservedstate.service.FileSettingsService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -46,24 +49,37 @@ public static class Request extends HealthNodeRequest {
private final DataStreamLifecycleHealthInfo dslHealthInfo;
@Nullable
private final RepositoriesHealthInfo repositoriesHealthInfo;
@Nullable
private final FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo;

public Request(
String nodeId,
DiskHealthInfo diskHealthInfo,
DataStreamLifecycleHealthInfo dslHealthInfo,
RepositoriesHealthInfo repositoriesHealthInfo
RepositoriesHealthInfo repositoriesHealthInfo,
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
) {
this.nodeId = nodeId;
this.diskHealthInfo = diskHealthInfo;
this.dslHealthInfo = dslHealthInfo;
this.repositoriesHealthInfo = repositoriesHealthInfo;
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
}

public Request(String nodeId, DataStreamLifecycleHealthInfo dslHealthInfo) {
this.nodeId = nodeId;
this.diskHealthInfo = null;
this.repositoriesHealthInfo = null;
this.dslHealthInfo = dslHealthInfo;
this.fileSettingsHealthInfo = null;
}

public Request(String nodeId, FileSettingsService.FileSettingsHealthInfo info) {
this.nodeId = nodeId;
this.diskHealthInfo = null;
this.repositoriesHealthInfo = null;
this.dslHealthInfo = null;
this.fileSettingsHealthInfo = info;
}

public Request(StreamInput in) throws IOException {
Expand All @@ -75,6 +91,9 @@ public Request(StreamInput in) throws IOException {
this.repositoriesHealthInfo = in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)
? in.readOptionalWriteable(RepositoriesHealthInfo::new)
: null;
this.fileSettingsHealthInfo = in.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
? in.readOptionalWriteable(FileSettingsService.FileSettingsHealthInfo::new)
: null;
} else {
// BWC for pre-8.12 the disk health info was mandatory. Evolving this request has proven tricky however we've made use of
// waiting for all nodes to be on the {@link TransportVersions.HEALTH_INFO_ENRICHED_WITH_DSL_STATUS} transport version
Expand All @@ -83,6 +102,7 @@ public Request(StreamInput in) throws IOException {
this.diskHealthInfo = new DiskHealthInfo(in);
this.dslHealthInfo = null;
this.repositoriesHealthInfo = null;
this.fileSettingsHealthInfo = null;
}
}

Expand All @@ -102,6 +122,11 @@ public RepositoriesHealthInfo getRepositoriesHealthInfo() {
return repositoriesHealthInfo;
}

@Nullable
public FileSettingsService.FileSettingsHealthInfo getFileSettingsHealthInfo() {
return fileSettingsHealthInfo;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -117,6 +142,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
out.writeOptionalWriteable(repositoriesHealthInfo);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
out.writeOptionalWriteable(fileSettingsHealthInfo);
}
} else {
// BWC for pre-8.12 the disk health info was mandatory. Evolving this request has proven tricky however we've made use of
// waiting for all nodes to be on the {@link TransportVersions.V_8_12_0} transport version
Expand Down Expand Up @@ -185,7 +213,7 @@ public Builder dslHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
}

public Request build() {
return new Request(nodeId, diskHealthInfo, dslHealthInfo, repositoriesHealthInfo);
return new Request(nodeId, diskHealthInfo, dslHealthInfo, repositoriesHealthInfo, null);
}
}
}
Expand Down Expand Up @@ -228,13 +256,22 @@ protected void healthOperation(
ClusterState clusterState,
ActionListener<AcknowledgedResponse> listener
) {
logger.debug(
"Updating health info cache on node [{}][{}] from node [{}]",
clusterService.getNodeName(),
clusterService.localNode().getId(),
request.getNodeId()
);
nodeHealthOverview.updateNodeHealth(
request.getNodeId(),
request.getDiskHealthInfo(),
request.getDslHealthInfo(),
request.getRepositoriesHealthInfo()
request.getRepositoriesHealthInfo(),
request.getFileSettingsHealthInfo()
);
listener.onResponse(AcknowledgedResponse.of(true));
}
}

private static final Logger logger = LogManager.getLogger(UpdateHealthInfoCacheAction.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,8 @@ public Map<String, String> queryFields() {
actionModule.getReservedClusterStateService().installClusterStateHandler(new ReservedRepositoryAction(repositoriesService));
actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedPipelineAction());

FileSettingsHealthIndicatorService fileSettingsHealthIndicatorService = new FileSettingsHealthIndicatorService(settings);
var fileSettingsHealthIndicatorPublisher = new FileSettingsService.FileSettingsHealthIndicatorPublisherImpl(clusterService, client);
var fileSettingsHealthIndicatorService = new FileSettingsHealthIndicatorService(settings, fileSettingsHealthIndicatorPublisher);
FileSettingsService fileSettingsService = pluginsService.loadSingletonServiceProvider(
FileSettingsServiceProvider.class,
() -> FileSettingsService::new
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reservedstate.service;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthIndicatorService;

/**
* Used by {@link FileSettingsHealthIndicatorService} to send health info to the health node.
*/
public interface FileSettingsHealthIndicatorPublisher {
void publish(FileSettingsService.FileSettingsHealthInfo info, ActionListener<AcknowledgedResponse> actionListener);
}
Loading