Skip to content

Add project-id to SnapshotDeletionsInProgress and RepositoryCleanupInProgress #129462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cl

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.String, java.util.List, long, long, org.elasticsearch.cluster.SnapshotDeletionsInProgress$State, java.lang.String)
org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(org.elasticsearch.cluster.metadata.ProjectId, java.lang.String, java.util.List, long, long, org.elasticsearch.cluster.SnapshotDeletionsInProgress$State, java.lang.String)

@defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
java.lang.Thread#<init>(java.lang.Runnable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00);
public static final TransportVersion SECURITY_CLOUD_API_KEY_REALM_AND_TYPE = def(9_099_0_00);
public static final TransportVersion STATE_PARAM_GET_SNAPSHOT = def(9_100_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP = def(9_101_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -198,11 +200,13 @@ public ClusterState execute(ClusterState currentState) {
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
);
}
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
return ClusterState.builder(currentState)
.putCustom(
RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
List.of(RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))
List.of(RepositoryCleanupInProgress.startedEntry(projectId, repositoryName, repositoryStateId))
)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -21,6 +22,9 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;

/**
* A repository cleanup request entry. Part of the cluster state.
Expand Down Expand Up @@ -49,8 +53,8 @@ public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws
return readDiffFrom(ClusterState.Custom.class, TYPE, in);
}

public static Entry startedEntry(String repository, long repositoryStateId) {
return new Entry(repository, repositoryStateId);
public static Entry startedEntry(ProjectId projectId, String repository, long repositoryStateId) {
return new Entry(projectId, repository, repositoryStateId);
}

public boolean hasCleanupInProgress() {
Expand Down Expand Up @@ -86,6 +90,18 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignore
);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
RepositoryCleanupInProgress that = (RepositoryCleanupInProgress) o;
return Objects.equals(entries, that.entries);
}

@Override
public int hashCode() {
return Objects.hashCode(entries);
}

@Override
public String toString() {
return Strings.toString(this);
Expand All @@ -96,24 +112,44 @@ public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ZERO;
}

public record Entry(String repository, long repositoryStateId) implements Writeable, RepositoryOperation {
public record Entry(ProjectId projectId, String repository, long repositoryStateId) implements Writeable, RepositoryOperation {

public static Entry readFrom(StreamInput in) throws IOException {
return new Entry(in.readString(), in.readLong());
final ProjectId projectId = in.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)
? ProjectId.readFrom(in)
: ProjectId.DEFAULT;
return new Entry(projectId, in.readString(), in.readLong());
}

@Override
public long repositoryStateId() {
return repositoryStateId;
}

@Override
public ProjectId projectId() {
return projectId;
}

@Override
public String repository() {
return repository;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)) {
projectId.writeTo(out);
} else {
if (ProjectId.DEFAULT.equals(projectId) == false) {
final var message = "Cannot write repository cleanup entry with non-default project id "
+ projectId
+ " to version before "
+ PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
assert false : message;
throw new IllegalStateException(message);
}
}
out.writeString(repository);
out.writeLong(repositoryStateId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -32,6 +33,8 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;

/**
* Represents the in-progress snapshot deletions in the cluster state.
*/
Expand Down Expand Up @@ -174,6 +177,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignore
Iterators.map(entries.iterator(), entry -> (builder, params) -> {
builder.startObject();
{
builder.field("project_id", entry.projectId);
builder.field("repository", entry.repository());
builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) {
Expand Down Expand Up @@ -206,14 +210,26 @@ public String toString() {
/**
* A class representing a snapshot deletion request entry in the cluster state.
*/
public record Entry(String repoName, List<SnapshotId> snapshots, long startTime, long repositoryStateId, State state, String uuid)
implements
Writeable,
RepositoryOperation {
public record Entry(
ProjectId projectId,
String repoName,
List<SnapshotId> snapshots,
long startTime,
long repositoryStateId,
State state,
String uuid
) implements Writeable, RepositoryOperation {

@SuppressForbidden(reason = "using a private constructor within the same file")
public Entry(String repoName, List<SnapshotId> snapshots, long startTime, long repositoryStateId, State state) {
this(repoName, snapshots, startTime, repositoryStateId, state, UUIDs.randomBase64UUID());
public Entry(
ProjectId projectId,
String repoName,
List<SnapshotId> snapshots,
long startTime,
long repositoryStateId,
State state
) {
this(projectId, repoName, snapshots, startTime, repositoryStateId, state, UUIDs.randomBase64UUID());
}

public Entry {
Expand All @@ -222,7 +238,11 @@ public Entry(String repoName, List<SnapshotId> snapshots, long startTime, long r

@SuppressForbidden(reason = "using a private constructor within the same file")
public static Entry readFrom(StreamInput in) throws IOException {
final ProjectId projectId = in.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)
? ProjectId.readFrom(in)
: ProjectId.DEFAULT;
return new Entry(
projectId,
in.readString(),
in.readCollectionAsImmutableList(SnapshotId::new),
in.readVLong(),
Expand All @@ -235,7 +255,7 @@ public static Entry readFrom(StreamInput in) throws IOException {
@SuppressForbidden(reason = "using a private constructor within the same file")
public Entry started() {
assert state == State.WAITING;
return new Entry(repository(), snapshots, startTime, repositoryStateId, State.STARTED, uuid);
return new Entry(projectId(), repository(), snapshots, startTime, repositoryStateId, State.STARTED, uuid);
}

@SuppressForbidden(reason = "using a private constructor within the same file")
Expand All @@ -245,21 +265,33 @@ public Entry withAddedSnapshots(Collection<SnapshotId> newSnapshots) {
if (updatedSnapshots.addAll(newSnapshots) == false) {
return this;
}
return new Entry(repository(), List.copyOf(updatedSnapshots), startTime, repositoryStateId, State.WAITING, uuid);
return new Entry(projectId(), repository(), List.copyOf(updatedSnapshots), startTime, repositoryStateId, State.WAITING, uuid);
}

@SuppressForbidden(reason = "using a private constructor within the same file")
public Entry withSnapshots(Collection<SnapshotId> snapshots) {
return new Entry(repository(), List.copyOf(snapshots), startTime, repositoryStateId, state, uuid);
return new Entry(projectId(), repository(), List.copyOf(snapshots), startTime, repositoryStateId, state, uuid);
}

@SuppressForbidden(reason = "using a private constructor within the same file")
public Entry withRepoGen(long repoGen) {
return new Entry(repository(), snapshots, startTime, repoGen, state, uuid);
return new Entry(projectId(), repository(), snapshots, startTime, repoGen, state, uuid);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)) {
projectId.writeTo(out);
} else {
if (ProjectId.DEFAULT.equals(projectId) == false) {
final var message = "Cannot write snapshot deletion entry with non-default project id "
+ projectId
+ " to version before "
+ PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
assert false : message;
throw new IllegalStateException(message);
}
}
out.writeString(repoName);
out.writeCollection(snapshots);
out.writeVLong(startTime);
Expand All @@ -268,6 +300,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(uuid);
}

@Override
public ProjectId projectId() {
return projectId;
}

@Override
public String repository() {
return repoName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
package org.elasticsearch.repositories;

import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.FixForMultiProject;

import java.io.IOException;

Expand All @@ -26,10 +24,7 @@ public interface RepositoryOperation {
/**
* Project for which repository belongs to.
*/
@FixForMultiProject(description = "default implementation is temporary")
default ProjectId projectId() {
return Metadata.DEFAULT_PROJECT_ID;
}
ProjectId projectId();

/**
* Name of the repository affected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -2255,7 +2256,10 @@ public ClusterState execute(ClusterState currentState) {
reusedExistingDelete = true;
return currentState;
}
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
newDelete = new SnapshotDeletionsInProgress.Entry(
projectId,
repositoryName,
List.copyOf(snapshotIdsRequiringCleanup),
threadPool.absoluteTimeInMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -434,6 +435,7 @@ public void testComputation() {
SnapshotDeletionsInProgress.of(
List.of(
new SnapshotDeletionsInProgress.Entry(
ProjectId.DEFAULT,
"test-repo",
List.of(new SnapshotId("deleting", "uuid")),
startTimes[2],
Expand All @@ -446,7 +448,7 @@ public void testComputation() {
.putCustom(
RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
List.of(new RepositoryCleanupInProgress.Entry("test-repo", randomNonNegativeLong()))
List.of(new RepositoryCleanupInProgress.Entry(ProjectId.DEFAULT, "test-repo", randomNonNegativeLong()))
)
)
.build(),
Expand Down
Loading