Skip to content

Make TransportMoveToStepAction project-aware #129252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ public void maybeRunAsyncAction(ClusterState clusterState, IndexMetadata indexMe
* Resolve the given phase, action, and name into a real {@link StepKey}. The phase is always
* required, but the action and name are optional. If a name is specified, an action is also required.
*/
public StepKey resolveStepKey(ClusterState state, Index index, String phase, @Nullable String action, @Nullable String name) {
public StepKey resolveStepKey(ProjectMetadata project, Index index, String phase, @Nullable String action, @Nullable String name) {
if (name == null) {
if (action == null) {
return this.policyRegistry.getFirstStepForPhase(state, index, phase);
return this.policyRegistry.getFirstStepForPhase(project, index, phase);
} else {
return this.policyRegistry.getFirstStepForPhaseAndAction(state, index, phase, action);
return this.policyRegistry.getFirstStepForPhaseAndAction(project, index, phase, action);
}
} else {
assert action != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -201,12 +200,11 @@ public void clear() {
* Return all ordered steps for the current policy for the index. Does not
* resolve steps using the phase caching, but only for the currently existing policy.
*/
private List<Step> getAllStepsForIndex(ClusterState state, Index index) {
final Metadata metadata = state.metadata();
if (metadata.getProject().hasIndex(index) == false) {
private List<Step> getAllStepsForIndex(ProjectMetadata project, Index index) {
if (project.hasIndex(index) == false) {
throw new IllegalArgumentException("index " + index + " does not exist in the current cluster state");
}
final IndexMetadata indexMetadata = metadata.getProject().index(index);
final IndexMetadata indexMetadata = project.index(index);
final String policyName = indexMetadata.getLifecyclePolicyName();
final LifecyclePolicyMetadata policyMetadata = lifecyclePolicyMap.get(policyName);
if (policyMetadata == null) {
Expand All @@ -225,8 +223,8 @@ private List<Step> getAllStepsForIndex(ClusterState state, Index index) {
* first step in that phase, if it exists, or null otherwise.
*/
@Nullable
public Step.StepKey getFirstStepForPhase(ClusterState state, Index index, String phase) {
return getAllStepsForIndex(state, index).stream()
public Step.StepKey getFirstStepForPhase(ProjectMetadata project, Index index, String phase) {
return getAllStepsForIndex(project, index).stream()
.map(Step::getKey)
.filter(stepKey -> phase.equals(stepKey.phase()))
.findFirst()
Expand All @@ -238,8 +236,8 @@ public Step.StepKey getFirstStepForPhase(ClusterState state, Index index, String
* for the first step in that phase, if it exists, or null otherwise.
*/
@Nullable
public Step.StepKey getFirstStepForPhaseAndAction(ClusterState state, Index index, String phase, String action) {
return getAllStepsForIndex(state, index).stream()
public Step.StepKey getFirstStepForPhaseAndAction(ProjectMetadata project, Index index, String phase, String action) {
return getAllStepsForIndex(project, index).stream()
.map(Step::getKey)
.filter(stepKey -> phase.equals(stepKey.phase()))
.filter(stepKey -> action.equals(stepKey.action()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -50,15 +51,17 @@
public class TransportMoveToStepAction extends TransportMasterNodeAction<TransportMoveToStepAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportMoveToStepAction.class);

IndexLifecycleService indexLifecycleService;
private final IndexLifecycleService indexLifecycleService;
private final ProjectResolver projectResolver;

@Inject
public TransportMoveToStepAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexLifecycleService indexLifecycleService
IndexLifecycleService indexLifecycleService,
ProjectResolver projectResolver
) {
super(
ILMActions.MOVE_TO_STEP.name(),
Expand All @@ -71,11 +74,13 @@ public TransportMoveToStepAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.indexLifecycleService = indexLifecycleService;
this.projectResolver = projectResolver;
}

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
IndexMetadata indexMetadata = state.metadata().getProject().index(request.getIndex());
final var project = projectResolver.getProjectMetadata(state);
IndexMetadata indexMetadata = project.index(request.getIndex());
if (indexMetadata == null) {
listener.onFailure(new IllegalArgumentException("index [" + request.getIndex() + "] does not exist"));
return;
Expand All @@ -95,7 +100,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
// Resolve the key that could have optional parts into one
// that is totally concrete given the existing policy and index
Step.StepKey concreteTargetStepKey = indexLifecycleService.resolveStepKey(
state,
project,
indexMetadata.getIndex(),
abstractTargetKey.getPhase(),
abstractTargetKey.getAction(),
Expand Down Expand Up @@ -125,8 +130,9 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
public ClusterState execute(ClusterState currentState) {
// Resolve the key that could have optional parts into one
// that is totally concrete given the existing policy and index
final var currentProject = currentState.metadata().getProject(project.id());
Step.StepKey concreteTargetStepKey = indexLifecycleService.resolveStepKey(
state,
currentProject,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was technically a bug because we were validating on the old state.

indexMetadata.getIndex(),
abstractTargetKey.getPhase(),
abstractTargetKey.getAction(),
Expand All @@ -148,7 +154,7 @@ public ClusterState execute(ClusterState currentState) {

concreteTargetKey.set(concreteTargetStepKey);
final var updatedProject = indexLifecycleService.moveIndexToStep(
currentState.metadata().getProject(),
currentProject,
indexMetadata.getIndex(),
request.getCurrentStepKey(),
concreteTargetKey.get()
Expand All @@ -158,7 +164,8 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
IndexMetadata newIndexMetadata = newState.metadata().getProject().index(indexMetadata.getIndex());
final var newProjectState = newState.projectState(project.id());
IndexMetadata newIndexMetadata = newProjectState.metadata().index(indexMetadata.getIndex());
if (newIndexMetadata == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug(
Expand Down