diff --git a/docs/changelog/126409.yaml b/docs/changelog/126409.yaml new file mode 100644 index 0000000000000..7c5401faefa78 --- /dev/null +++ b/docs/changelog/126409.yaml @@ -0,0 +1,6 @@ +pr: 126409 +summary: System data streams are not being upgraded in the feature migration API +area: Infra/Core +type: bug +issues: + - 122949 diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java index 5d07337ebfdb9..b74b6dbe0d9b3 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java @@ -326,6 +326,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 855644a09e0e0..edaa0b241c7a0 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -275,6 +275,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), Collections.singletonList("test"), + "test", new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE) ) ); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java index aeba15563b991..4119ef2f34de8 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java @@ -213,6 +213,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 6890fbc21975a..cc1a5af0d6760 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -1094,6 +1094,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of(), + "test", ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS ) ); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java index 98aa0d460752c..67f31f5769ef0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java @@ -1153,6 +1153,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); @@ -1192,6 +1193,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); @@ -1231,6 +1233,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); @@ -1299,6 +1302,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), List.of("product"), + "product", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ) ); diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 6a1b5bfb97685..eb28cef3812af 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -282,6 +282,7 @@ exports org.elasticsearch.indices.recovery; exports org.elasticsearch.indices.recovery.plan; exports org.elasticsearch.indices.store; + exports org.elasticsearch.indices.system; exports org.elasticsearch.inference; exports org.elasticsearch.ingest; exports org.elasticsearch.internal diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 18895be0d1b2e..62417cbdd5863 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -382,6 +382,7 @@ private static void addBackingIndex( mapperSupplier, false, failureStore, + dataStream.isSystem(), nodeSettings ); } catch (IOException e) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java index 897dc1188ebd0..607ab79ffe59f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java @@ -156,7 +156,7 @@ static ClusterState migrateToDataStream( ProjectMetadata.Builder mb = ProjectMetadata.builder(project); for (Index index : alias.getIndices()) { IndexMetadata im = project.index(index); - prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, Settings.EMPTY); + prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, false, Settings.EMPTY); } ClusterState updatedState = ClusterState.builder(projectState.cluster()).putProjectMetadata(mb).build(); @@ -212,6 +212,8 @@ static void validateRequest(ProjectMetadata project, MigrateToDataStreamClusterS * exception should be thrown in that case instead * @param failureStore true if the index is being migrated into the data stream's failure store, false if it * is being migrated into the data stream's backing indices + * @param makeSystem true if the index is being migrated into the system data stream, false if it + * is being migrated into non-system data stream * @param nodeSettings The settings for the current node */ static void prepareBackingIndex( @@ -221,6 +223,7 @@ static void prepareBackingIndex( Function mapperSupplier, boolean removeAlias, boolean failureStore, + boolean makeSystem, Settings nodeSettings ) throws IOException { MappingMetadata mm = im.mapping(); @@ -251,6 +254,7 @@ static void prepareBackingIndex( imb.mappingVersion(im.getMappingVersion() + 1) .mappingsUpdatedVersion(IndexVersion.current()) .putMapping(new MappingMetadata(mapper)); + imb.system(makeSystem); b.put(imb); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index 086126408fb84..d58d392b77649 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -241,6 +241,10 @@ private List updateIndices(ClusterState currentState, List final List updatedMetadata = new ArrayList<>(); for (Index index : indices) { IndexMetadata indexMetadata = metadata.indexMetadata(index); + // this might happen because update is async and the index might have been deleted between task creation and execution + if (indexMetadata == null) { + continue; + } final boolean shouldBeSystem = shouldBeSystem(indexMetadata); IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem); if (updatedIndexMetadata != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java index 1ce1522acb90c..1366d13f71daa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java @@ -78,7 +78,6 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX; @@ -1152,7 +1151,8 @@ public List getImpacts() { /** * Returns the diagnosis for unassigned primary and replica shards. - * @param verbose true if the diagnosis should be generated, false if they should be omitted. + * + * @param verbose true if the diagnosis should be generated, false if they should be omitted. * @param maxAffectedResourcesCount the max number of affected resources to be returned as part of the diagnosis * @return The diagnoses list the indicator identified. Alternatively, an empty list if none were found or verbose is false. */ @@ -1243,23 +1243,6 @@ static List getRestoreFromSnapshotAffectedResources( } } - Map> featureToDsBackingIndices = getSystemDsBackingIndicesForProjects( - systemIndices, - affectedProjects, - metadata - ); - - // the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from - // the list of affected indices (the feature state will cover the restore of these indices too) - for (Map.Entry> featureToBackingIndices : featureToDsBackingIndices.entrySet()) { - for (ProjectIndexName featureIndex : featureToBackingIndices.getValue()) { - if (restoreFromSnapshotIndices.contains(featureIndex)) { - affectedFeatureStates.add(featureToBackingIndices.getKey()); - affectedIndices.remove(featureIndex); - } - } - } - if (affectedIndices.isEmpty() == false) { affectedResources.add( new Diagnosis.Resource( @@ -1281,7 +1264,7 @@ static List getRestoreFromSnapshotAffectedResources( } /** - * Retrieve the system indices for the projects and group them by Feature + * Retrieve the system indices and indices backing system data streams for the projects and group them by Feature */ private static Map> getSystemIndicesForProjects( SystemIndices systemIndices, @@ -1293,7 +1276,7 @@ private static Map> getSystemIndicesForProjects( .collect( Collectors.toMap( SystemIndices.Feature::getName, - feature -> feature.getIndexDescriptors() + feature -> feature.getSystemResourceDescriptors() .stream() .flatMap( descriptor -> projects.stream() @@ -1307,34 +1290,6 @@ private static Map> getSystemIndicesForProjects( ) ); } - - /** - * Retrieve the backing indices for system data stream for the projects and group them by Feature - */ - private static Map> getSystemDsBackingIndicesForProjects( - SystemIndices systemIndices, - Set projects, - Metadata metadata - ) { - return systemIndices.getFeatures() - .stream() - .collect( - toMap( - SystemIndices.Feature::getName, - feature -> feature.getDataStreamDescriptors() - .stream() - .flatMap( - descriptor -> projects.stream() - .flatMap( - projectId -> descriptor.getBackingIndexNames(metadata.getProject(projectId)) - .stream() - .map(index -> new ProjectIndexName(projectId, index)) - ) - ) - .collect(Collectors.toSet()) - ) - ); - } } public static class SearchableSnapshotsState { diff --git a/server/src/main/java/org/elasticsearch/indices/AssociatedIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/AssociatedIndexDescriptor.java index 725949120512e..8ef22abd2689b 100644 --- a/server/src/main/java/org/elasticsearch/indices/AssociatedIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/AssociatedIndexDescriptor.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.automaton.CharacterRunAutomaton; import org.apache.lucene.util.automaton.RegExp; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.indices.system.IndexPatternMatcher; import java.util.List; import java.util.Objects; diff --git a/server/src/main/java/org/elasticsearch/indices/IndexPatternMatcher.java b/server/src/main/java/org/elasticsearch/indices/IndexMatcher.java similarity index 71% rename from server/src/main/java/org/elasticsearch/indices/IndexPatternMatcher.java rename to server/src/main/java/org/elasticsearch/indices/IndexMatcher.java index 519804b9d2666..567cb47c9454f 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndexPatternMatcher.java +++ b/server/src/main/java/org/elasticsearch/indices/IndexMatcher.java @@ -15,23 +15,15 @@ import java.util.List; /** - * An IndexPatternMatcher holds an index pattern in a string and, given a - * {@link Metadata} object, can return a list of index names matching that pattern. + * An IndexMatcher given a {@link Metadata} object, can return a list of index names matching that pattern. */ -public interface IndexPatternMatcher { - /** - * @return A pattern, either with a wildcard or simple regex, describing indices that are - * related to a system feature. Such indices may be system indices or associated - * indices. - */ - String getIndexPattern(); - +public interface IndexMatcher { /** * Retrieves a list of all indices which match this descriptor's pattern. Implementations * may include other special information when matching indices, such as aliases. - * + *

* This cannot be done via {@link org.elasticsearch.cluster.metadata.IndexNameExpressionResolver} because that class can only handle - * simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax, + * simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax. * * @param project The current metadata to get the list of matching indices from * @return A list of index names that match this descriptor diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index 2bbdaaaae7946..b361cf652feb4 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.system.SystemResourceDescriptor; import java.util.Collections; import java.util.List; @@ -45,7 +46,7 @@ *

The descriptor also provides names for the thread pools that Elasticsearch should use to read, search, or modify the descriptor’s * indices. */ -public class SystemDataStreamDescriptor { +public class SystemDataStreamDescriptor implements SystemResourceDescriptor { private final String dataStreamName; private final String description; @@ -53,6 +54,7 @@ public class SystemDataStreamDescriptor { private final ComposableIndexTemplate composableIndexTemplate; private final Map componentTemplates; private final List allowedElasticProductOrigins; + private final String origin; private final ExecutorNames executorNames; /** @@ -66,6 +68,7 @@ public class SystemDataStreamDescriptor { * {@link ComposableIndexTemplate} * @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the * type is {@link Type#EXTERNAL}. Must not be {@code null} + * @param origin specifies the origin to use when creating or updating the data stream * @param executorNames thread pools that should be used for operations on the system data stream */ public SystemDataStreamDescriptor( @@ -75,6 +78,7 @@ public SystemDataStreamDescriptor( ComposableIndexTemplate composableIndexTemplate, Map componentTemplates, List allowedElasticProductOrigins, + String origin, ExecutorNames executorNames ) { this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified"); @@ -96,6 +100,7 @@ public SystemDataStreamDescriptor( throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination"); } this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS; + this.origin = origin; } public String getDataStreamName() { @@ -125,6 +130,11 @@ public List getBackingIndexNames(ProjectMetadata projectMetadata) { return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(Index::getName).toList(); } + @Override + public List getMatchingIndices(ProjectMetadata metadata) { + return getBackingIndexNames(metadata); + } + public String getDescription() { return description; } @@ -133,6 +143,17 @@ public ComposableIndexTemplate getComposableIndexTemplate() { return composableIndexTemplate; } + @Override + public String getOrigin() { + return origin; + } + + @Override + public boolean isAutomaticallyManaged() { + return true; + } + + @Override public boolean isExternal() { return type == Type.EXTERNAL; } @@ -142,9 +163,10 @@ public String getBackingIndexPattern() { } private static String backingIndexPatternForDataStream(String dataStream) { - return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*"; + return ".(migrated-){0,}[fd]s-" + dataStream + "-*"; } + @Override public List getAllowedElasticProductOrigins() { return allowedElasticProductOrigins; } @@ -157,6 +179,7 @@ public Map getComponentTemplates() { * Get the names of the thread pools that should be used for operations on this data stream. * @return Names for get, search, and write executors. */ + @Override public ExecutorNames getThreadPoolNames() { return this.executorNames; } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index 2b555553b03fc..f5b4ad159eaa2 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -25,6 +25,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.indices.system.IndexPatternMatcher; +import org.elasticsearch.indices.system.SystemResourceDescriptor; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -103,7 +105,7 @@ * A system index that is fully internal to Elasticsearch will not allow any product origins; such an index is fully "locked down," * and in general can only be changed by restoring feature states from snapshots. */ -public class SystemIndexDescriptor implements IndexPatternMatcher, Comparable { +public class SystemIndexDescriptor implements IndexPatternMatcher, SystemResourceDescriptor, Comparable { public static final Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(); @@ -297,7 +299,7 @@ protected SystemIndexDescriptor( } Objects.requireNonNull(allowedElasticProductOrigins, "allowedProductOrigins must not be null"); - if (type.isInternal() && allowedElasticProductOrigins.isEmpty() == false) { + if (type.isExternal() == false && allowedElasticProductOrigins.isEmpty() == false) { throw new IllegalArgumentException("Allowed origins are not valid for internal system indices"); } else if (type.isExternal() && allowedElasticProductOrigins.isEmpty()) { throw new IllegalArgumentException("External system indices without allowed products is not a valid combination"); @@ -442,9 +444,7 @@ public List getMatchingIndices(ProjectMetadata project) { return project.indices().keySet().stream().filter(this::matchesIndexPattern).toList(); } - /** - * @return A short description of the purpose of this system index. - */ + @Override public String getDescription() { return description; } @@ -473,16 +473,12 @@ public int getIndexFormat() { return this.indexFormat; } + @Override public boolean isAutomaticallyManaged() { return type.isManaged(); } - /** - * Get an origin string suitable for use in an {@link org.elasticsearch.client.internal.OriginSettingClient}. See - * {@link Builder#setOrigin(String)} for more information. - * - * @return an origin string to use for sub-requests - */ + @Override public String getOrigin() { // TODO[wrb]: most unmanaged system indices do not set origins; could we assert on that here? return this.origin; @@ -493,20 +489,12 @@ public boolean hasDynamicMappings() { return this.hasDynamicMappings; } + @Override public boolean isExternal() { return type.isExternal(); } - public boolean isInternal() { - return type.isInternal(); - } - - /** - * Requests from these products, if made with the proper security credentials, are allowed non-deprecated access to this descriptor's - * indices. (Product names may be specified in requests with the - * {@link org.elasticsearch.tasks.Task#X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER}). - * @return A list of product names. - */ + @Override public List getAllowedElasticProductOrigins() { return allowedElasticProductOrigins; } @@ -574,6 +562,7 @@ public SystemIndexDescriptor getDescriptorCompatibleWith(MappingsVersion version /** * @return The names of thread pools that should be used for operations on this system index. */ + @Override public ExecutorNames getThreadPoolNames() { return this.executorNames; } @@ -626,10 +615,6 @@ public boolean isExternal() { public boolean isManaged() { return managed; } - - public boolean isInternal() { - return external == false; - } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 2a49afdbe39ad..6860728e6eaf0 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -40,6 +41,9 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.indices.system.IndexPatternMatcher; +import org.elasticsearch.indices.system.SystemResourceDescriptor; +import org.elasticsearch.node.Node; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.snapshots.SnapshotsService; @@ -73,7 +77,7 @@ * from the user index space for a few reasons. In some cases, the indices contain information that should be hidden from users. But, * more generally, we want to protect these indices and data streams from being inadvertently modified or deleted. * - *

The system resources are grouped by feature, using the {@link SystemIndices.Feature} class. Most features will be loaded from + *

The system resources are grouped by feature, using the {@link Feature} class. Most features will be loaded from * instances of {@link SystemIndexPlugin}; any other features will be described in this class. Features may be retrieved by name or * iterated over (see {@link #getFeature(String)} and {@link #getFeatures()}). Each Feature provides collections of * {@link SystemIndexDescriptor}s or {@link SystemDataStreamDescriptor}s. These descriptors define their resources by means of patterns. @@ -84,7 +88,7 @@ *

For more information about the expected behavior of system indices, see {@link SystemIndexDescriptor}. For more information about * the expected behavior of system data streams, see {@link SystemDataStreamDescriptor}. * - *

The SystemIndices object is constructed during {@link org.elasticsearch.node.Node} startup, and is not modified after construction. + *

The SystemIndices object is constructed during {@link Node} startup, and is not modified after construction. * In other words, the set of system resources will be consistent over the lifetime of a node. * *

System resources will specify thread pools for reads, writes, and searches. This can ensure that system-critical operations, such @@ -234,7 +238,7 @@ private static void checkForDuplicateAliases(Collection d final List duplicateAliases = aliasCounts.entrySet() .stream() .filter(entry -> entry.getValue() > 1) - .map(Map.Entry::getKey) + .map(Entry::getKey) .sorted() .toList(); @@ -320,7 +324,7 @@ public boolean isSystemDataStream(String name) { /** * Determines whether the provided name matches that of an index that backs a system data stream. Backing indices * for system data streams are marked as "system" in their metadata (see {@link - * org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService}) and receive the same protections as the + * SystemIndexMetadataUpgradeService}) and receive the same protections as the * system data stream. */ public boolean isSystemIndexBackingDataStream(String name) { @@ -712,7 +716,7 @@ private static Map buildFeatureMap(List features) { return Map.copyOf(map); } - Collection getSystemIndexDescriptors() { + public Collection getSystemIndexDescriptors() { return this.featureDescriptors.values().stream().flatMap(f -> f.getIndexDescriptors().stream()).toList(); } @@ -882,6 +886,14 @@ public Collection getDataStreamDescriptors() { return dataStreamDescriptors; } + /** + * Returns descriptors of all system resources - indices and data streams. + * Doesn't include associated indices {@link AssociatedIndexDescriptor}. + */ + public Collection getSystemResourceDescriptors() { + return Stream.concat(indexDescriptors.stream(), dataStreamDescriptors.stream()).toList(); + } + public Collection getAssociatedIndexDescriptors() { return associatedIndexDescriptors; } diff --git a/server/src/main/java/org/elasticsearch/indices/system/IndexPatternMatcher.java b/server/src/main/java/org/elasticsearch/indices/system/IndexPatternMatcher.java new file mode 100644 index 0000000000000..5526f437d80c8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/system/IndexPatternMatcher.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.system; + +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.indices.IndexMatcher; + +/** + * An IndexPatternMatcher holds an index pattern in a string and, given a + * {@link Metadata} object, can return a list of index names matching that pattern. + */ +public interface IndexPatternMatcher extends IndexMatcher { + /** + * @return A pattern, either with a wildcard or simple regex, describing indices that are + * related to a system feature. Such indices may be system indices or associated + * indices. + */ + String getIndexPattern(); + +} diff --git a/server/src/main/java/org/elasticsearch/indices/system/SystemResourceDescriptor.java b/server/src/main/java/org/elasticsearch/indices/system/SystemResourceDescriptor.java new file mode 100644 index 0000000000000..023b54dec989d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/system/SystemResourceDescriptor.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.system; + +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.IndexMatcher; +import org.elasticsearch.indices.SystemIndexDescriptor; + +import java.util.List; + +public interface SystemResourceDescriptor extends IndexMatcher { + /** + * @return A short description of the purpose of this system resource. + */ + String getDescription(); + + boolean isAutomaticallyManaged(); + + /** + * Get an origin string suitable for use in an {@link org.elasticsearch.client.internal.OriginSettingClient}. See + * {@link SystemIndexDescriptor.Builder#setOrigin(String)} for more information. + * + * @return an origin string to use for sub-requests + */ + String getOrigin(); + + boolean isExternal(); + + /** + * Requests from these products, if made with the proper security credentials, are allowed non-deprecated access to this descriptor's + * indices. (Product names may be specified in requests with the + * {@link org.elasticsearch.tasks.Task#X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER}). + * @return A list of product names. + */ + List getAllowedElasticProductOrigins(); + + /** + * @return The names of thread pools that should be used for operations on this system index. + */ + ExecutorNames getThreadPoolNames(); +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index b2d1dada3eaeb..3c48d4a4201e6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -653,6 +653,7 @@ private static SystemDataStreamDescriptor systemDataStreamDescriptor() { .build(), Map.of(), List.of("stack"), + "stack", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index 4f05ff8a70a59..a95863ceda0fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -97,6 +97,65 @@ public void testAddBackingIndex() { IndexMetadata zeroIndex = newProject.index(ds.getIndices().get(0)); assertThat(zeroIndex.getIndex(), equalTo(indexToAdd.getIndex())); assertThat(zeroIndex.getSettings().get("index.hidden"), equalTo("true")); + assertThat(zeroIndex.isSystem(), equalTo(false)); + assertThat(zeroIndex.getAliases().size(), equalTo(0)); + } + + public void testAddBackingIndexToSystemDataStream() { + final long epochMillis = System.currentTimeMillis(); + final int numBackingIndices = randomIntBetween(1, 4); + final String dataStreamName = randomAlphaOfLength(5); + IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices]; + ProjectMetadata.Builder mb = ProjectMetadata.builder(randomProjectIdOrDefault()); + for (int k = 0; k < numBackingIndices; k++) { + backingIndices[k] = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis)) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp")) + .system(true) + .build(); + mb.put(backingIndices[k], false); + } + + DataStream dataStream = DataStream.builder(dataStreamName, Arrays.stream(backingIndices).map(IndexMetadata::getIndex).toList()) + .setSystem(true) + .setHidden(true) + .build(); + mb.put(dataStream); + + final IndexMetadata indexToAdd = IndexMetadata.builder(randomAlphaOfLength(5)) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp")) + .system(false) + .build(); + mb.put(indexToAdd, false); + + ProjectMetadata projectMetadata = mb.build(); + ProjectMetadata newState = MetadataDataStreamsService.modifyDataStream( + projectMetadata, + List.of(DataStreamAction.addBackingIndex(dataStreamName, indexToAdd.getIndex().getName())), + this::getMapperService, + Settings.EMPTY + ); + + IndexAbstraction ds = newState.getIndicesLookup().get(dataStreamName); + assertThat(ds, notNullValue()); + assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); + assertThat(ds.getIndices().size(), equalTo(numBackingIndices + 1)); + List backingIndexNames = ds.getIndices().stream().filter(x -> x.getName().startsWith(".ds-")).map(Index::getName).toList(); + assertThat( + backingIndexNames, + containsInAnyOrder( + Arrays.stream(backingIndices).map(IndexMetadata::getIndex).map(Index::getName).toList().toArray(Strings.EMPTY_ARRAY) + ) + ); + IndexMetadata zeroIndex = newState.index(ds.getIndices().get(0)); + assertThat(zeroIndex.getIndex(), equalTo(indexToAdd.getIndex())); + assertThat(zeroIndex.getSettings().get("index.hidden"), equalTo("true")); + assertThat(zeroIndex.isSystem(), equalTo(true)); assertThat(zeroIndex.getAliases().size(), equalTo(0)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java index c896350ce3c07..117b2e05ff3fc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java @@ -450,6 +450,7 @@ public void testSettingsVersion() throws IOException { mapperSupplier, removeAlias, failureStore, + false, nodeSettings ); ProjectMetadata metadata = metadataBuilder.build(); @@ -476,6 +477,7 @@ public void testSettingsVersion() throws IOException { mapperSupplier, removeAlias, failureStore, + false, nodeSettings ); ProjectMetadata metadata = metadataBuilder.build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java index 856a8b4af956f..d5c01e9885cd5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeServiceTests.java @@ -71,6 +71,7 @@ public class SystemIndexMetadataUpgradeServiceTests extends ESTestCase { ComposableIndexTemplate.builder().build(), Collections.emptyMap(), Collections.singletonList("FAKE_ORIGIN"), + "FAKE_ORIGIN", ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java index 5045cc6f498de..c3de0c954c49a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceTests.java @@ -2031,6 +2031,7 @@ private SystemIndices getSystemIndices( .build(), Map.of(), List.of("test"), + "test", new ExecutorNames( ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, diff --git a/server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java b/server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java index dacf3c28fce02..ff02a07b878b1 100644 --- a/server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/ExecutorSelectorTests.java @@ -84,6 +84,7 @@ public void testDefaultSystemDataStreamThreadPools() { .build(), Map.of(), Collections.singletonList("test"), + "test", null ) ) @@ -116,6 +117,7 @@ public void testCustomSystemDataStreamThreadPools() { .build(), Map.of(), Collections.singletonList("test"), + "test", new ExecutorNames( ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java index dfcb1c0f1db9b..d8cee65d37b41 100644 --- a/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.indices; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; @@ -290,4 +292,40 @@ public void testMappingsVersions() { assertThat(mappingsVersions.get(".managed-primary").version(), equalTo(3)); assertThat(mappingsVersions.keySet(), not(contains("unmanaged"))); } + + public void testSystemDataStreamPattern() { + String dataStreamName = ".my-data-stream"; + SystemDataStreamDescriptor dataStreamDescriptor = new SystemDataStreamDescriptor( + dataStreamName, + "", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder().build(), + Map.of(), + Collections.singletonList("origin"), + "origin", + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ); + + final SystemIndices systemIndices = new SystemIndices( + List.of( + new SystemIndices.Feature("test", "test feature", Collections.emptyList(), Collections.singletonList(dataStreamDescriptor)) + ) + ); + assertThat( + systemIndices.isSystemIndexBackingDataStream(DataStream.BACKING_INDEX_PREFIX + dataStreamName + "-2025.03.07-000001"), + equalTo(true) + ); + assertThat( + systemIndices.isSystemIndexBackingDataStream(DataStream.FAILURE_STORE_PREFIX + dataStreamName + "-2025.03.07-000001"), + equalTo(true) + ); + assertThat(systemIndices.isSystemIndexBackingDataStream(".migrated-ds-" + dataStreamName + "-2025.03.07-000001"), equalTo(true)); + assertThat( + systemIndices.isSystemIndexBackingDataStream(".migrated-migrated-ds-" + dataStreamName + "-2025.03.07-000001"), + equalTo(true) + ); + assertThat(systemIndices.isSystemIndexBackingDataStream(".migrated-" + dataStreamName + "-2025.03.07-000001"), equalTo(false)); + assertThat(systemIndices.isSystemIndexBackingDataStream(dataStreamName), equalTo(false)); + assertThat(systemIndices.isSystemIndexBackingDataStream(dataStreamName + "-2025.03.07-000001"), equalTo(false)); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java index b834e86955f3d..78a9029165817 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java @@ -29,12 +29,18 @@ public class DeprecatedIndexPredicate { * @param metadata the cluster metadata * @param filterToBlockedStatus if true, only indices that are write blocked will be returned, * if false, only those without a block are returned + * @param includeSystem if true, all indices including system will be returned, + * if false, only non-system indices are returned * @return a predicate that returns true for indices that need to be reindexed */ - public static Predicate getReindexRequiredPredicate(ProjectMetadata metadata, boolean filterToBlockedStatus) { + public static Predicate getReindexRequiredPredicate( + ProjectMetadata metadata, + boolean filterToBlockedStatus, + boolean includeSystem + ) { return index -> { IndexMetadata indexMetadata = metadata.index(index); - return reindexRequired(indexMetadata, filterToBlockedStatus); + return reindexRequired(indexMetadata, filterToBlockedStatus, includeSystem); }; } @@ -45,11 +51,13 @@ public static Predicate getReindexRequiredPredicate(ProjectMetadata metad * @param indexMetadata the index metadata * @param filterToBlockedStatus if true, only indices that are write blocked will be returned, * if false, only those without a block are returned + * @param includeSystem if true, all indices including system will be returned, + * if false, only non-system indices are returned * @return a predicate that returns true for indices that need to be reindexed */ - public static boolean reindexRequired(IndexMetadata indexMetadata, boolean filterToBlockedStatus) { + public static boolean reindexRequired(IndexMetadata indexMetadata, boolean filterToBlockedStatus, boolean includeSystem) { return creationVersionBeforeMinimumWritableVersion(indexMetadata) - && isNotSystem(indexMetadata) + && (includeSystem || isNotSystem(indexMetadata)) && isNotSearchableSnapshot(indexMetadata) && matchBlockedStatus(indexMetadata, filterToBlockedStatus); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 961f363be7958..924659a2e101e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -237,7 +237,7 @@ public class InternalUsers { ModifyDataStreamsAction.NAME, ILMActions.RETRY.name() ) - .allowRestrictedIndices(false) + .allowRestrictedIndices(true) .build() }, null, null, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicateTests.java new file mode 100644 index 0000000000000..73344b826e87e --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicateTests.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.deprecation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + +import static org.elasticsearch.cluster.metadata.MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING; + +public class DeprecatedIndexPredicateTests extends ESTestCase { + public void testReindexIsNotRequiredOnNewIndex() { + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(IndexVersion.current()); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false); + assertFalse(reindexRequired); + } + + public void testReindexIsRequiredOnOldIndex() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(false); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false); + assertTrue(reindexRequired); + } + + public void testReindexIsNotRequiredOnSystemIndex() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(true); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false); + assertFalse(reindexRequired); + } + + public void testReindexIsRequiredOnSystemIndexWhenExplicitlyIncluded() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(true); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, true); + assertTrue(reindexRequired); + } + + public void testReindexIsNotRequiredOnOldSearchableSnapshot() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(false); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(true); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false); + assertFalse(reindexRequired); + } + + public void testReindexIsNotRequiredOnBlockedIndex() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(false); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().put(VERIFIED_READ_ONLY_SETTING.getKey(), true).build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false); + assertFalse(reindexRequired); + } + + public void testReindexIsRequiredOnBlockedIndexWhenExplicitlyIncluded() { + IndexVersion previousVersion = IndexVersion.getMinimumCompatibleIndexVersion(IndexVersion.current().id()); + + IndexMetadata indexMetadata = Mockito.mock(IndexMetadata.class); + Mockito.when(indexMetadata.getCreationVersion()).thenReturn(previousVersion); + Mockito.when(indexMetadata.isSystem()).thenReturn(false); + Mockito.when(indexMetadata.isSearchableSnapshot()).thenReturn(false); + Mockito.when(indexMetadata.getSettings()).thenReturn(Settings.builder().put(VERIFIED_READ_ONLY_SETTING.getKey(), true).build()); + + boolean reindexRequired = DeprecatedIndexPredicate.reindexRequired(indexMetadata, true, false); + assertTrue(reindexRequired); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/test/TestRestrictedIndices.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/test/TestRestrictedIndices.java index 3848d785275d4..28fea1a7099ed 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/test/TestRestrictedIndices.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/test/TestRestrictedIndices.java @@ -116,6 +116,7 @@ public class TestRestrictedIndices { .build(), Map.of(), List.of("fleet", "kibana"), + "fleet", null ) ) diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecker.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecker.java index 3aa587d3cc6ee..7567655755d00 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecker.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecker.java @@ -137,7 +137,9 @@ private static Set getReindexRequiredIndices( boolean filterToBlockedStatus ) { return backingIndices.stream() - .filter(DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterState.metadata().getProject(), filterToBlockedStatus)) + .filter( + DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterState.metadata().getProject(), filterToBlockedStatus, false) + ) .map(Index::getName) .collect(Collectors.toUnmodifiableSet()); } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecker.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecker.java index 50600be380402..e8d3d6492ae3b 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecker.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecker.java @@ -95,7 +95,7 @@ private DeprecationIssue oldIndicesCheck( // TODO: this check needs to be revised. It's trivially true right now. IndexVersion currentCompatibilityVersion = indexMetadata.getCompatibilityVersion(); // We intentionally exclude indices that are in data streams because they will be picked up by DataStreamDeprecationChecks - if (DeprecatedIndexPredicate.reindexRequired(indexMetadata, false) && isNotDataStreamIndex(indexMetadata, clusterState)) { + if (DeprecatedIndexPredicate.reindexRequired(indexMetadata, false, false) && isNotDataStreamIndex(indexMetadata, clusterState)) { var transforms = transformIdsForIndex(indexMetadata, indexToTransformIds); if (transforms.isEmpty() == false) { return new DeprecationIssue( @@ -138,7 +138,7 @@ private DeprecationIssue ignoredOldIndicesCheck( ) { IndexVersion currentCompatibilityVersion = indexMetadata.getCompatibilityVersion(); // We intentionally exclude indices that are in data streams because they will be picked up by DataStreamDeprecationChecks - if (DeprecatedIndexPredicate.reindexRequired(indexMetadata, true) && isNotDataStreamIndex(indexMetadata, clusterState)) { + if (DeprecatedIndexPredicate.reindexRequired(indexMetadata, true, false) && isNotDataStreamIndex(indexMetadata, clusterState)) { var transforms = transformIdsForIndex(indexMetadata, indexToTransformIds); if (transforms.isEmpty() == false) { return new DeprecationIssue( diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java index 8d3a64a751505..27e406545a7dd 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java @@ -278,6 +278,7 @@ private static SystemDataStreamDescriptor fleetActionsResultsDescriptor() { composableIndexTemplate, Map.of(), ALLOWED_PRODUCTS, + FLEET_ORIGIN, ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS ); } catch (IOException e) { diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java index efe186953466c..8ef0027256082 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/AbstractFeatureMigrationIntegTest.java @@ -17,11 +17,12 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -35,7 +36,9 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.system_indices.task.FeatureMigrationResults; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; @@ -51,18 +54,27 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; import static org.elasticsearch.common.util.set.Sets.newHashSet; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, autoManageMasterNodes = false) public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase { @@ -81,7 +93,6 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase protected static final IndexVersion NEEDS_UPGRADE_INDEX_VERSION = IndexVersionUtils.getPreviousMajorVersion( SystemIndices.NO_UPGRADE_REQUIRED_INDEX_VERSION ); - protected static final int UPGRADED_TO_VERSION = SystemIndices.NO_UPGRADE_REQUIRED_VERSION.major + 1; static final SystemIndexDescriptor EXTERNAL_UNMANAGED = SystemIndexDescriptor.builder() .setIndexPattern(".ext-unman-*") @@ -139,6 +150,18 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase protected String masterAndDataNode; protected String masterName; + protected static ProjectMetadata assertMetadataAfterMigration(String featureName) { + ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject(); + // Check that the results metadata is what we expect. + FeatureMigrationResults currentResults = finalMetadata.custom(FeatureMigrationResults.TYPE); + assertThat(currentResults, notNullValue()); + assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(featureName))); + assertThat(currentResults.getFeatureStatuses().get(featureName).succeeded(), is(true)); + assertThat(currentResults.getFeatureStatuses().get(featureName).getFailedResourceName(), nullValue()); + assertThat(currentResults.getFeatureStatuses().get(featureName).getException(), nullValue()); + return finalMetadata; + } + @Before public void setup() { internalCluster().setBootstrapMasterNodeIndex(0); @@ -150,7 +173,7 @@ public void setup() { testPlugin.postMigrationHook.set((state, metadata) -> {}); } - public T getPlugin(Class type) { + protected T getPlugin(Class type) { final PluginsService pluginsService = internalCluster().getCurrentMasterNodeInstance(PluginsService.class); return pluginsService.filterPlugins(type).findFirst().get(); } @@ -181,7 +204,7 @@ protected void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) createRequest.setSettings( createSettings( NEEDS_UPGRADE_INDEX_VERSION, - descriptor.isInternal() ? INTERNAL_UNMANAGED_FLAG_VALUE : EXTERNAL_UNMANAGED_FLAG_VALUE + descriptor.isExternal() ? EXTERNAL_UNMANAGED_FLAG_VALUE : INTERNAL_UNMANAGED_FLAG_VALUE ) ); } else { @@ -194,7 +217,7 @@ protected void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) ); } if (descriptor.isAutomaticallyManaged() == false) { - createRequest.setMapping(createMapping(false, descriptor.isInternal())); + createRequest.setMapping(createMapping(false, descriptor.isExternal() == false)); } CreateIndexResponse response = createRequest.get(); Assert.assertTrue(response.isShardsAcknowledged()); @@ -245,14 +268,14 @@ static String createMapping(boolean descriptorManaged, boolean descriptorInterna } protected void assertIndexHasCorrectProperties( - Metadata metadata, + ProjectMetadata metadata, String indexName, int settingsFlagValue, boolean isManaged, boolean isInternal, Collection aliasNames ) { - IndexMetadata imd = metadata.getProject().index(indexName); + IndexMetadata imd = metadata.index(indexName); assertThat(imd.getSettings().get(FlAG_SETTING_KEY), equalTo(Integer.toString(settingsFlagValue))); final Map mapping = imd.mapping().getSourceAsMap(); @SuppressWarnings("unchecked") @@ -274,6 +297,48 @@ protected void assertIndexHasCorrectProperties( assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT)); } + protected void executeMigration(String featureName) throws Exception { + startMigration(featureName); + + GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT); + // The feature upgrade may take longer than ten seconds when tests are running + // in parallel, so we give assertBusy a thirty-second timeout. + assertBusy(() -> { + GetFeatureUpgradeStatusResponse statusResponse = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest) + .get(); + logger.info(Strings.toString(statusResponse)); + assertThat(statusResponse.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED)); + }, 30, TimeUnit.SECONDS); + } + + protected static void startMigration(String featureName) throws InterruptedException, ExecutionException { + PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT); + PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get(); + assertThat(migrationResponse.getReason(), nullValue()); + assertThat(migrationResponse.getElasticsearchException(), nullValue()); + final Set migratingFeatures = migrationResponse.getFeatures() + .stream() + .map(PostFeatureUpgradeResponse.Feature::getFeatureName) + .collect(Collectors.toSet()); + assertThat(migratingFeatures, hasItem(featureName)); + } + + protected static TestPlugin.BlockingActionFilter blockAction(String actionTypeName) { + // Block the alias request to simulate a failure + InternalTestCluster internalTestCluster = internalCluster(); + ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName()); + TestPlugin.BlockingActionFilter blockingActionFilter = null; + for (ActionFilter filter : actionFilters.filters()) { + if (filter instanceof TestPlugin.BlockingActionFilter) { + blockingActionFilter = (TestPlugin.BlockingActionFilter) filter; + break; + } + } + assertNotNull("BlockingActionFilter should exist", blockingActionFilter); + blockingActionFilter.blockActions(actionTypeName); + return blockingActionFilter; + } + public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin { public final AtomicReference>> preMigrationHook = new AtomicReference<>(); public final AtomicReference>> postMigrationHook = new AtomicReference<>(); @@ -341,6 +406,10 @@ public int order() { return 0; } + public void unblockAllActions() { + blockedActions = emptySet(); + } + public void blockActions(String... actions) { blockedActions = unmodifiableSet(newHashSet(actions)); } diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java index 81bdb9e1c6e72..cb42300bb31ff 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/FeatureMigrationIT.java @@ -15,8 +15,6 @@ import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.ActionFilter; -import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -24,6 +22,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -37,7 +36,6 @@ import org.elasticsearch.system_indices.action.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter; import org.elasticsearch.system_indices.task.FeatureMigrationResults; import org.elasticsearch.system_indices.task.SingleFeatureMigrationResult; -import org.elasticsearch.test.InternalTestCluster; import java.util.ArrayList; import java.util.Arrays; @@ -51,15 +49,13 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.elasticsearch.indices.SystemIndices.UPGRADED_INDEX_SUFFIX; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCountAndNoFailures; -import static org.hamcrest.Matchers.aMapWithSize; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class FeatureMigrationIT extends AbstractFeatureMigrationIntegTest { @@ -199,11 +195,11 @@ public void testMigrateSystemIndex() throws Exception { assertTrue("the pre-migration hook wasn't actually called", preUpgradeHookCalled.get()); assertTrue("the post-migration hook wasn't actually called", postUpgradeHookCalled.get()); - Metadata finalMetadata = assertMetadataAfterMigration(FEATURE_NAME); + ProjectMetadata finalMetadata = assertMetadataAfterMigration(FEATURE_NAME); assertIndexHasCorrectProperties( finalMetadata, - ".int-man-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".int-man-old" + UPGRADED_INDEX_SUFFIX, INTERNAL_MANAGED_FLAG_VALUE, true, true, @@ -211,7 +207,7 @@ public void testMigrateSystemIndex() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".int-unman-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".int-unman-old" + UPGRADED_INDEX_SUFFIX, INTERNAL_UNMANAGED_FLAG_VALUE, false, true, @@ -219,7 +215,7 @@ public void testMigrateSystemIndex() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".ext-man-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".ext-man-old" + UPGRADED_INDEX_SUFFIX, EXTERNAL_MANAGED_FLAG_VALUE, true, false, @@ -227,7 +223,7 @@ public void testMigrateSystemIndex() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".ext-unman-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".ext-unman-old" + UPGRADED_INDEX_SUFFIX, EXTERNAL_UNMANAGED_FLAG_VALUE, false, false, @@ -235,18 +231,6 @@ public void testMigrateSystemIndex() throws Exception { ); } - private static Metadata assertMetadataAfterMigration(String featureName) { - Metadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata(); - // Check that the results metadata is what we expect. - FeatureMigrationResults currentResults = finalMetadata.getProject().custom(FeatureMigrationResults.TYPE); - assertThat(currentResults, notNullValue()); - assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(featureName))); - assertThat(currentResults.getFeatureStatuses().get(featureName).succeeded(), is(true)); - assertThat(currentResults.getFeatureStatuses().get(featureName).getFailedIndexName(), nullValue()); - assertThat(currentResults.getFeatureStatuses().get(featureName).getException(), nullValue()); - return finalMetadata; - } - public void testMigrateIndexWithWriteBlock() throws Exception { createSystemIndexForDescriptor(INTERNAL_UNMANAGED); @@ -270,18 +254,7 @@ public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception { createSystemIndexForDescriptor(INTERNAL_UNMANAGED); ensureGreen(); - // Block the alias request to simulate a failure - InternalTestCluster internalTestCluster = internalCluster(); - ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName()); - BlockingActionFilter blockingActionFilter = null; - for (ActionFilter filter : actionFilters.filters()) { - if (filter instanceof BlockingActionFilter) { - blockingActionFilter = (BlockingActionFilter) filter; - break; - } - } - assertNotNull("BlockingActionFilter should exist", blockingActionFilter); - blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME); + BlockingActionFilter blockingActionFilter = blockAction(TransportIndicesAliasesAction.NAME); // Start the migration client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); @@ -307,7 +280,7 @@ public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception { assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false")); // Unblock the alias request - blockingActionFilter.blockActions(); + blockingActionFilter.unblockAllActions(); // Retry the migration client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); @@ -381,36 +354,14 @@ public void onFailure(Exception e) { }); } - private void executeMigration(String featureName) throws Exception { - PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT); - PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get(); - assertThat(migrationResponse.getReason(), nullValue()); - assertThat(migrationResponse.getElasticsearchException(), nullValue()); - final Set migratingFeatures = migrationResponse.getFeatures() - .stream() - .map(PostFeatureUpgradeResponse.Feature::getFeatureName) - .collect(Collectors.toSet()); - assertThat(migratingFeatures, hasItem(featureName)); - - GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT); - // The feature upgrade may take longer than ten seconds when tests are running - // in parallel, so we give assertBusy a sixty-second timeout. - assertBusy(() -> { - GetFeatureUpgradeStatusResponse statusResponse = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest) - .get(); - logger.info(Strings.toString(statusResponse)); - assertThat(statusResponse.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED)); - }, 60, TimeUnit.SECONDS); - } - public void testMigrateUsingScript() throws Exception { createSystemIndexForDescriptor(INTERNAL_MANAGED_WITH_SCRIPT); executeMigration(SCRIPTED_INDEX_FEATURE_NAME); ensureGreen(); - Metadata metadata = assertMetadataAfterMigration(SCRIPTED_INDEX_FEATURE_NAME); - String newIndexName = ".int-mans-old-reindexed-for-" + UPGRADED_TO_VERSION; + ProjectMetadata metadata = assertMetadataAfterMigration(SCRIPTED_INDEX_FEATURE_NAME); + String newIndexName = ".int-mans-old" + UPGRADED_INDEX_SUFFIX; assertIndexHasCorrectProperties( metadata, newIndexName, diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java index cbbc2c23871b2..11aa44fe32337 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/MultiFeatureMigrationIT.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -36,6 +36,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.indices.SystemIndices.UPGRADED_INDEX_SUFFIX; import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; @@ -137,7 +138,7 @@ public void testMultipleFeatureMigration() throws Exception { assertThat(currentResults, notNullValue()); assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME))); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true)); - assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue()); + assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedResourceName(), nullValue()); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue()); secondPluginPreMigrationHookCalled.set(true); @@ -158,7 +159,7 @@ public void testMultipleFeatureMigration() throws Exception { assertThat(currentResults, notNullValue()); assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME))); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true)); - assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue()); + assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedResourceName(), nullValue()); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue()); secondPluginPostMigrationHookCalled.set(true); @@ -192,22 +193,22 @@ public void testMultipleFeatureMigration() throws Exception { assertTrue("the second plugin's pre-migration hook wasn't actually called", secondPluginPreMigrationHookCalled.get()); assertTrue("the second plugin's post-migration hook wasn't actually called", secondPluginPostMigrationHookCalled.get()); - Metadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata(); + ProjectMetadata finalMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject(); // Check that the results metadata is what we expect - FeatureMigrationResults currentResults = finalMetadata.getProject().custom(FeatureMigrationResults.TYPE); + FeatureMigrationResults currentResults = finalMetadata.custom(FeatureMigrationResults.TYPE); assertThat(currentResults, notNullValue()); assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(2), hasKey(FEATURE_NAME), hasKey(SECOND_FEATURE_NAME))); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true)); - assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue()); + assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedResourceName(), nullValue()); assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue()); assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).succeeded(), is(true)); - assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).getFailedIndexName(), nullValue()); + assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).getFailedResourceName(), nullValue()); assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).getException(), nullValue()); // Finally, verify that all the indices exist and have the properties we expect. assertIndexHasCorrectProperties( finalMetadata, - ".int-man-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".int-man-old" + UPGRADED_INDEX_SUFFIX, INTERNAL_MANAGED_FLAG_VALUE, true, true, @@ -215,7 +216,7 @@ public void testMultipleFeatureMigration() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".int-unman-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".int-unman-old" + UPGRADED_INDEX_SUFFIX, INTERNAL_UNMANAGED_FLAG_VALUE, false, true, @@ -223,7 +224,7 @@ public void testMultipleFeatureMigration() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".ext-man-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".ext-man-old" + UPGRADED_INDEX_SUFFIX, EXTERNAL_MANAGED_FLAG_VALUE, true, false, @@ -231,7 +232,7 @@ public void testMultipleFeatureMigration() throws Exception { ); assertIndexHasCorrectProperties( finalMetadata, - ".ext-unman-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".ext-unman-old" + UPGRADED_INDEX_SUFFIX, EXTERNAL_UNMANAGED_FLAG_VALUE, false, false, @@ -240,7 +241,7 @@ public void testMultipleFeatureMigration() throws Exception { assertIndexHasCorrectProperties( finalMetadata, - ".second-int-man-old-reindexed-for-" + UPGRADED_TO_VERSION, + ".second-int-man-old" + UPGRADED_INDEX_SUFFIX, SECOND_FEATURE_IDX_FLAG_VALUE, true, true, diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/SystemDataStreamMigrationIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/SystemDataStreamMigrationIT.java new file mode 100644 index 0000000000000..734cf7cdbb7a9 --- /dev/null +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/SystemDataStreamMigrationIT.java @@ -0,0 +1,212 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.system_indices.action; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; + +public class SystemDataStreamMigrationIT extends AbstractFeatureMigrationIntegTest { + private static final String TEST_DATA_STREAM_NAME = ".test-data-stream"; + private static final String DATA_STREAM_FEATURE = "ds-feature"; + private static volatile SystemDataStreamDescriptor systemDataStreamDescriptor = createSystemDataStreamDescriptor( + NEEDS_UPGRADE_INDEX_VERSION + ); + + private static SystemDataStreamDescriptor createSystemDataStreamDescriptor(IndexVersion indexVersion) { + return new SystemDataStreamDescriptor( + TEST_DATA_STREAM_NAME, + "system data stream test", + SystemDataStreamDescriptor.Type.EXTERNAL, + ComposableIndexTemplate.builder() + .template( + Template.builder() + .dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)) + .settings(indexSettings(indexVersion, 1, 0)) + ) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(), + Map.of(), + List.of("product"), + ORIGIN, + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).build(); + } + + @Override + protected boolean forbidPrivateIndexSettings() { + // We need to be able to set the index creation version manually. + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(DataStreamsPlugin.class); + plugins.add(DataStreamTestPlugin.class); + return plugins; + } + + @After + public void restoreDescriptor() { + // we need to do it in after, because we need to have systemDataStreamDescriptor in a correct state + // before next super.setup() is called + systemDataStreamDescriptor = createSystemDataStreamDescriptor(NEEDS_UPGRADE_INDEX_VERSION); + } + + private static void indexDocsToDataStream(String dataStreamName) { + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + for (int i = 0; i < INDEX_DOC_COUNT; i++) { + IndexRequestBuilder requestBuilder = ESIntegTestCase.prepareIndex(dataStreamName) + .setId(Integer.toString(i)) + .setRequireDataStream(true) + .setOpType(DocWriteRequest.OpType.CREATE) + .setSource(DataStream.TIMESTAMP_FIELD_NAME, 1741271969000L, FIELD_NAME, "words words"); + bulkBuilder.add(requestBuilder); + } + + BulkResponse actionGet = bulkBuilder.get(); + assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false)); + } + + public void testMigrateSystemDataStream() throws Exception { + createDataStream(); + + indexDocsToDataStream(TEST_DATA_STREAM_NAME); + + simulateClusterUpgrade(); + + executeMigration(DATA_STREAM_FEATURE); + + // Waiting for shards to stabilize if indices were moved around + ensureGreen(); + + ProjectMetadata finalMetadata = assertMetadataAfterMigration(DATA_STREAM_FEATURE); + + DataStream dataStream = finalMetadata.dataStreams().get(TEST_DATA_STREAM_NAME); + assertNotNull(dataStream); + assertThat(dataStream.isSystem(), is(true)); + List backingIndices = dataStream.getIndices(); + assertThat(backingIndices, hasSize(2)); + for (Index backingIndex : backingIndices) { + IndexMetadata indexMetadata = finalMetadata.index(backingIndex); + assertThat(indexMetadata.isSystem(), is(true)); + assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current())); + } + } + + public void testMigrationRestartAfterFailure() throws Exception { + createDataStream(); + + indexDocsToDataStream(TEST_DATA_STREAM_NAME); + + simulateClusterUpgrade(); + + TestPlugin.BlockingActionFilter blockingActionFilter = blockAction(TransportCreateIndexAction.TYPE.name()); + + startMigration(DATA_STREAM_FEATURE); + + GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT); + assertBusy(() -> { + GetFeatureUpgradeStatusResponse statusResponse = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest) + .get(); + logger.info(Strings.toString(statusResponse)); + assertThat(statusResponse.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR)); + }, 30, TimeUnit.SECONDS); + + blockingActionFilter.unblockAllActions(); + ensureGreen(); + + executeMigration(DATA_STREAM_FEATURE); + ensureGreen(); + + assertMetadataAfterMigration(DATA_STREAM_FEATURE); + } + + private void simulateClusterUpgrade() throws Exception { + String indexVersionCreated = systemDataStreamDescriptor.getComposableIndexTemplate() + .template() + .settings() + .get(IndexMetadata.SETTING_VERSION_CREATED); + assertThat(indexVersionCreated, is(NEEDS_UPGRADE_INDEX_VERSION.toString())); + // we can't have NEEDS_UPGRADE_VERSION in settings anymore, + // because those settings will be used in index rollover during data stream migration + // instead we update settings here, kinda simulating upgrade to a new version and restart the cluster + systemDataStreamDescriptor = createSystemDataStreamDescriptor(IndexVersion.current()); + + internalCluster().fullRestart(); + ensureGreen(); + } + + private void createDataStream() throws InterruptedException, ExecutionException { + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + TEST_DATA_STREAM_NAME + ); + AcknowledgedResponse createDSResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + assertTrue(createDSResponse.isAcknowledged()); + + ensureGreen(); + } + + public static class DataStreamTestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin { + @Override + public String getFeatureName() { + return DATA_STREAM_FEATURE; + } + + @Override + public String getFeatureDescription() { + return "Feature to test system data streams migration"; + } + + @Override + public Collection getSystemDataStreamDescriptors() { + return List.of(systemDataStreamDescriptor); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java index 26e23a152709a..ff26eb000e405 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusAction.java @@ -139,11 +139,11 @@ static List getIndexInfos(ClusterStat (FeatureMigrationResults) state.metadata().getProject().custom(FeatureMigrationResults.TYPE) ).map(FeatureMigrationResults::getFeatureStatuses).map(results -> results.get(feature.getName())).orElse(null); - final String failedFeatureName = featureStatus == null ? null : featureStatus.getFailedIndexName(); - final String failedFeatureUpgradedName = failedFeatureName == null ? null : failedFeatureName + UPGRADED_INDEX_SUFFIX; + final String failedResourceName = featureStatus == null ? null : featureStatus.getFailedResourceName(); + final String failedFeatureUpgradedName = failedResourceName == null ? null : failedResourceName + UPGRADED_INDEX_SUFFIX; final Exception exception = featureStatus == null ? null : featureStatus.getException(); - return feature.getIndexDescriptors() + Stream indexInfoStream = feature.getIndexDescriptors() .stream() .flatMap(descriptor -> descriptor.getMatchingIndices(state.metadata().getProject()).stream()) .sorted(String::compareTo) @@ -152,11 +152,32 @@ static List getIndexInfos(ClusterStat indexMetadata -> new GetFeatureUpgradeStatusResponse.IndexInfo( indexMetadata.getIndex().getName(), indexMetadata.getCreationVersion(), - (indexMetadata.getIndex().getName().equals(failedFeatureName) + (indexMetadata.getIndex().getName().equals(failedResourceName) || indexMetadata.getIndex().getName().equals(failedFeatureUpgradedName)) ? exception : null ) - ) - .toList(); + ); + + Stream dataStreamsIndexInfoStream = feature.getDataStreamDescriptors() + .stream() + .flatMap(descriptor -> { + Exception dsException = (descriptor.getDataStreamName().equals(failedResourceName)) ? exception : null; + + // we don't know migration of which backing index has failed, + // so, unfortunately, have to report exception for all indices for now + return descriptor.getMatchingIndices(state.metadata().getProject()) + .stream() + .sorted(String::compareTo) + .map(index -> state.metadata().getProject().index(index)) + .map( + indexMetadata -> new GetFeatureUpgradeStatusResponse.IndexInfo( + indexMetadata.getIndex().getName(), + indexMetadata.getCreationVersion(), + dsException + ) + ); + }); + + return Stream.concat(indexInfoStream, dataStreamsIndexInfoStream).toList(); } @Override diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java index 5158ec1cd9fb7..43bbe2a5aa74b 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/MigrationResultsUpdateTask.java @@ -97,7 +97,7 @@ public void onFailure(Exception clusterStateUpdateException) { () -> format( "failed to update cluster state after failed migration of feature [%s] on index [%s]", featureName, - status.getFailedIndexName() + status.getFailedResourceName() ), clusterStateUpdateException ); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SingleFeatureMigrationResult.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SingleFeatureMigrationResult.java index 266ebff4239fc..0975b5eea297d 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SingleFeatureMigrationResult.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SingleFeatureMigrationResult.java @@ -82,7 +82,7 @@ public boolean succeeded() { * Gets the name of the specific index where the migration failure occurred, if the migration failed. */ @Nullable - public String getFailedIndexName() { + public String getFailedResourceName() { return failedIndexName; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemDataStreamMigrationInfo.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemDataStreamMigrationInfo.java new file mode 100644 index 0000000000000..ff589581fe8f8 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemDataStreamMigrationInfo.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.system_indices.task; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemIndices; + +import java.util.stream.Stream; + +final class SystemDataStreamMigrationInfo extends SystemResourceMigrationInfo { + private final DataStream dataStream; + private final String dataStreamName; + + private SystemDataStreamMigrationInfo( + DataStream dataStream, + String dataStreamName, + String featureName, + String origin, + SystemIndices.Feature owningFeature + ) { + super(featureName, origin, owningFeature); + this.dataStreamName = dataStreamName; + this.dataStream = dataStream; + } + + public static SystemDataStreamMigrationInfo build( + DataStream dataStream, + SystemDataStreamDescriptor dataStreamDescriptor, + SystemIndices.Feature feature + ) { + return new SystemDataStreamMigrationInfo( + dataStream, + dataStreamDescriptor.getDataStreamName(), + feature.getName(), + dataStreamDescriptor.getOrigin(), + feature + ); + } + + public String getDataStreamName() { + return dataStreamName; + } + + @Override + protected String getCurrentResourceName() { + return getDataStreamName(); + } + + @Override + Stream getIndices(ProjectMetadata metadata) { + return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(metadata::getIndexSafe); + } + + @Override + boolean isCurrentIndexClosed() { + // data stream can't be closed + return false; + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java index b37a7a15a8310..e15a1d36bdb9f 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java @@ -20,6 +20,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -37,18 +38,21 @@ public class SystemIndexMigrationExecutor extends PersistentTasksExecutor taskInProgress, Map headers ) { - return new SystemIndexMigrator(client, id, type, action, parentTaskId, headers, clusterService, systemIndices, indexScopedSettings); + return new SystemIndexMigrator( + client, + id, + type, + action, + parentTaskId, + headers, + clusterService, + systemIndices, + indexScopedSettings, + threadPool + ); } @Override diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationInfo.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationInfo.java index 579e13cd827d3..0e6083397e041 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationInfo.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationInfo.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -22,7 +23,6 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.plugins.SystemIndexPlugin; -import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.stream.Stream; @@ -34,7 +34,7 @@ * Holds the data required to migrate a single system index, including metadata from the current index. If necessary, computes the settings * and mappings for the "next" index based off of the current one. */ -class SystemIndexMigrationInfo implements Comparable { +final class SystemIndexMigrationInfo extends SystemResourceMigrationInfo { private static final Logger logger = LogManager.getLogger(SystemIndexMigrationInfo.class); private final IndexMetadata currentIndex; @@ -46,10 +46,6 @@ class SystemIndexMigrationInfo implements Comparable { private final SystemIndices.Feature owningFeature; private final boolean allowsTemplates; - private static final Comparator SAME_CLASS_COMPARATOR = Comparator.comparing( - SystemIndexMigrationInfo::getFeatureName - ).thenComparing(SystemIndexMigrationInfo::getCurrentIndexName); - private SystemIndexMigrationInfo( IndexMetadata currentIndex, String featureName, @@ -60,6 +56,7 @@ private SystemIndexMigrationInfo( SystemIndices.Feature owningFeature, boolean allowsTemplates ) { + super(featureName, origin, owningFeature); this.currentIndex = currentIndex; this.featureName = featureName; this.settings = settings; @@ -77,9 +74,20 @@ String getCurrentIndexName() { return currentIndex.getIndex().getName(); } + @Override + protected String getCurrentResourceName() { + return getCurrentIndexName(); + } + + @Override + Stream getIndices(ProjectMetadata metadata) { + return Stream.of(currentIndex); + } + /** * Indicates if the index to be migrated is closed. */ + @Override boolean isCurrentIndexClosed() { return CLOSE.equals(currentIndex.getState()); } @@ -170,11 +178,6 @@ Client createClient(Client baseClient) { return new OriginSettingClient(baseClient, this.getOrigin()); } - @Override - public int compareTo(SystemIndexMigrationInfo o) { - return SAME_CLASS_COMPARATOR.compare(this, o); - } - @Override public String toString() { return "IndexUpgradeInfo[" diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java index 9969da914c2eb..57e7af8411944 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrator.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; @@ -38,6 +39,7 @@ import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -47,16 +49,25 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.script.Script; import org.elasticsearch.tasks.TaskId; - -import java.util.LinkedList; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamEnrichedStatus; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE; @@ -77,12 +88,13 @@ public class SystemIndexMigrator extends AllocatedPersistentTask { private final ClusterService clusterService; private final SystemIndices systemIndices; private final IndexScopedSettings indexScopedSettings; + private final ThreadPool threadPool; // In-memory state // NOTE: This queue is not a thread-safe class. Use `synchronized (migrationQueue)` whenever you access this. I chose this rather than // a synchronized/concurrent collection or an AtomicReference because we often need to do compound operations, which are much simpler // with `synchronized` blocks than when only the collection accesses are protected. - private final Queue migrationQueue = new LinkedList<>(); + private final Queue migrationQueue = new ArrayDeque<>(); private final AtomicReference> currentFeatureCallbackMetadata = new AtomicReference<>(); public SystemIndexMigrator( @@ -94,17 +106,20 @@ public SystemIndexMigrator( Map headers, ClusterService clusterService, SystemIndices systemIndices, - IndexScopedSettings indexScopedSettings + IndexScopedSettings indexScopedSettings, + ThreadPool threadPool ) { super(id, type, action, "system-index-migrator", parentTask, headers); this.baseClient = new ParentTaskAssigningClient(client, parentTask); this.clusterService = clusterService; this.systemIndices = systemIndices; this.indexScopedSettings = indexScopedSettings; + this.threadPool = threadPool; } public void run(SystemIndexMigrationTaskState taskState) { ClusterState clusterState = clusterService.state(); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(); final String stateIndexName; final String stateFeatureName; @@ -124,7 +139,7 @@ public void run(SystemIndexMigrationTaskState taskState) { return; } - if (stateIndexName != null && clusterState.metadata().getProject().hasIndex(stateIndexName) == false) { + if (stateIndexName != null && projectMetadata.hasIndexAbstraction(stateIndexName) == false) { markAsFailed(new IndexNotFoundException(stateIndexName, "cannot migrate because that index does not exist")); return; } @@ -142,14 +157,14 @@ public void run(SystemIndexMigrationTaskState taskState) { systemIndices.getFeatures() .stream() - .flatMap(feature -> SystemIndexMigrationInfo.fromFeature(feature, clusterState.metadata(), indexScopedSettings)) - .filter(migrationInfo -> needsToBeMigrated(clusterState.metadata().getProject().index(migrationInfo.getCurrentIndexName()))) + .flatMap(feature -> SystemResourceMigrationFactory.fromFeature(feature, projectMetadata, indexScopedSettings)) + .filter(migrationInfo -> needToBeMigrated(migrationInfo.getIndices(projectMetadata))) .sorted() // Stable order between nodes .collect(Collectors.toCollection(() -> migrationQueue)); List closedIndices = migrationQueue.stream() - .filter(SystemIndexMigrationInfo::isCurrentIndexClosed) - .map(SystemIndexMigrationInfo::getCurrentIndexName) + .filter(SystemResourceMigrationInfo::isCurrentIndexClosed) + .map(SystemResourceMigrationInfo::getCurrentResourceName) .toList(); if (closedIndices.isEmpty() == false) { markAsFailed( @@ -161,27 +176,27 @@ public void run(SystemIndexMigrationTaskState taskState) { // The queue we just generated *should* be the same one as was generated on the last node, so the first entry in the queue // should be the same as is in the task state if (stateIndexName != null && stateFeatureName != null && migrationQueue.isEmpty() == false) { - SystemIndexMigrationInfo nextMigrationInfo = migrationQueue.peek(); + SystemResourceMigrationInfo nextMigrationInfo = migrationQueue.peek(); // This should never, ever happen in testing mode, but could conceivably happen if there are different sets of plugins // installed on the previous node vs. this one. assert nextMigrationInfo.getFeatureName().equals(stateFeatureName) - && nextMigrationInfo.getCurrentIndexName().equals(stateIndexName) - : "index name [" + && nextMigrationInfo.getCurrentResourceName().equals(stateIndexName) + : "system index/data stream name [" + stateIndexName + "] or feature name [" + stateFeatureName - + "] from task state did not match first index [" - + nextMigrationInfo.getCurrentIndexName() + + "] from task state did not match first index/data stream [" + + nextMigrationInfo.getCurrentResourceName() + "] and feature [" + nextMigrationInfo.getFeatureName() + "] of locally computed queue, see logs"; - if (nextMigrationInfo.getCurrentIndexName().equals(stateIndexName) == false) { - if (clusterState.metadata().getProject().hasIndex(stateIndexName) == false) { + if (nextMigrationInfo.getCurrentResourceName().equals(stateIndexName) == false) { + if (projectMetadata.hasIndexAbstraction(stateIndexName) == false) { // If we don't have that index at all, and also don't have the next one markAsFailed( new IllegalStateException( format( - "failed to resume system index migration from index [%s], that index is not present in the cluster", + "failed to resume system resource migration from resource [%s], that is not present in the cluster", stateIndexName ) ) @@ -189,8 +204,9 @@ public void run(SystemIndexMigrationTaskState taskState) { } logger.warn( () -> format( - "resuming system index migration with index [%s], which does not match index given in last task state [%s]", - nextMigrationInfo.getCurrentIndexName(), + "resuming system resource migration with resource [%s]," + + " which does not match resource given in last task state [%s]", + nextMigrationInfo.getCurrentResourceName(), stateIndexName ) ); @@ -198,35 +214,43 @@ public void run(SystemIndexMigrationTaskState taskState) { } } - // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex + // Kick off our callback "loop" - finishIndexAndLoop calls back into startFeatureMigration logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState)); - clearResults( - clusterService, - ActionListener.wrap( - state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName), - this::markAsFailed - ) - ); + clearResults(clusterService, ActionListener.wrap(state -> startFeatureMigration(stateFeatureName), this::markAsFailed)); } - private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { + private void finishIndexAndLoop(SystemIndexMigrationInfo migrationInfo, BulkByScrollResponse bulkResponse) { // The BulkByScroll response is validated in #migrateSingleIndex, it's just here to satisfy the ActionListener type assert bulkResponse.isTimedOut() == false && (bulkResponse.getBulkFailures() == null || bulkResponse.getBulkFailures().isEmpty()) && (bulkResponse.getSearchFailures() == null || bulkResponse.getSearchFailures().isEmpty()) : "If this assertion gets triggered it means the validation in migrateSingleIndex isn't working right"; - SystemIndexMigrationInfo lastMigrationInfo = currentMigrationInfo(); logger.info( "finished migrating old index [{}] from feature [{}] to new index [{}]", - lastMigrationInfo.getCurrentIndexName(), - lastMigrationInfo.getFeatureName(), - lastMigrationInfo.getNextIndexName() + migrationInfo.getCurrentIndexName(), + migrationInfo.getFeatureName(), + migrationInfo.getNextIndexName() ); + + finishResourceAndLoop(migrationInfo); + } + + private void finishDataStreamAndLoop(SystemDataStreamMigrationInfo migrationInfo) { + logger.info( + "finished migrating old indices from data stream [{}] from feature [{}] to new indices", + migrationInfo.getCurrentResourceName(), + migrationInfo.getFeatureName() + ); + + finishResourceAndLoop(migrationInfo); + } + + private void finishResourceAndLoop(SystemResourceMigrationInfo lastMigrationInfo) { assert migrationQueue != null && migrationQueue.isEmpty() == false; synchronized (migrationQueue) { migrationQueue.remove(); } - SystemIndexMigrationInfo nextMigrationInfo = currentMigrationInfo(); + SystemResourceMigrationInfo nextMigrationInfo = currentMigrationInfo(); if (nextMigrationInfo == null || nextMigrationInfo.getFeatureName().equals(lastMigrationInfo.getFeatureName()) == false) { // The next feature name is different than the last one, so we just finished a feature - time to invoke its post-migration hook lastMigrationInfo.indicesMigrationComplete( @@ -237,7 +261,8 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { if (successful == false) { // GWB> Should we actually fail in this case instead of plugging along? logger.warn( - "post-migration hook for feature [{}] indicated failure; feature migration metadata prior to failure was [{}]", + "post-migration hook for feature [{}] indicated failure;" + + " feature migration metadata prior to failure was [{}]", lastMigrationInfo.getFeatureName(), currentFeatureCallbackMetadata.get() ); @@ -246,25 +271,43 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { }, this::markAsFailed) ); } else { - prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName()); + startFeatureMigration(lastMigrationInfo.getFeatureName()); + } + } + + private void migrateResource(SystemResourceMigrationInfo migrationInfo, ClusterState clusterState) { + if (migrationInfo instanceof SystemIndexMigrationInfo systemIndexMigrationInfo) { + logger.info( + "preparing to migrate old index [{}] from feature [{}] to new index [{}]", + systemIndexMigrationInfo.getCurrentIndexName(), + migrationInfo.getFeatureName(), + systemIndexMigrationInfo.getNextIndexName() + ); + migrateSingleIndex(systemIndexMigrationInfo, clusterState, this::finishIndexAndLoop); + } else if (migrationInfo instanceof SystemDataStreamMigrationInfo systemDataStreamMigrationInfo) { + logger.info( + "preparing to migrate old indices from data stream [{}] from feature [{}] to new indices", + systemDataStreamMigrationInfo.getCurrentResourceName(), + migrationInfo.getFeatureName() + ); + migrateDataStream(systemDataStreamMigrationInfo, this::finishDataStreamAndLoop); + } else { + throw new IllegalStateException("Unknown type of migration: " + migrationInfo.getClass()); } } - private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationInfo) { + private void recordIndexMigrationSuccess(SystemResourceMigrationInfo lastMigrationInfo) { MigrationResultsUpdateTask updateTask = MigrationResultsUpdateTask.upsert( lastMigrationInfo.getFeatureName(), SingleFeatureMigrationResult.success(), ActionListener.wrap(state -> { - prepareNextIndex( - clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop), - lastMigrationInfo.getFeatureName() - ); + startFeatureMigration(lastMigrationInfo.getFeatureName()); }, this::markAsFailed) ); updateTask.submit(clusterService); } - private void prepareNextIndex(Consumer listener, String lastFeatureName) { + private void startFeatureMigration(String lastFeatureName) { synchronized (migrationQueue) { assert migrationQueue != null; if (migrationQueue.isEmpty()) { @@ -274,29 +317,23 @@ private void prepareNextIndex(Consumer listener, String lastFeatur } } - final SystemIndexMigrationInfo migrationInfo = currentMigrationInfo(); + final SystemResourceMigrationInfo migrationInfo = currentMigrationInfo(); assert migrationInfo != null : "the queue of indices to migrate should have been checked for emptiness before calling this method"; - logger.info( - "preparing to migrate old index [{}] from feature [{}] to new index [{}]", - migrationInfo.getCurrentIndexName(), - migrationInfo.getFeatureName(), - migrationInfo.getNextIndexName() - ); if (migrationInfo.getFeatureName().equals(lastFeatureName) == false) { // And then invoke the pre-migration hook for the next one. migrationInfo.prepareForIndicesMigration(clusterService, baseClient, ActionListener.wrap(newMetadata -> { currentFeatureCallbackMetadata.set(newMetadata); - updateTaskState(migrationInfo, listener, newMetadata); + updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), newMetadata); }, this::markAsFailed)); } else { // Otherwise, just re-use what we already have. - updateTaskState(migrationInfo, listener, currentFeatureCallbackMetadata.get()); + updateTaskState(migrationInfo, state -> migrateResource(migrationInfo, state), currentFeatureCallbackMetadata.get()); } } - private void updateTaskState(SystemIndexMigrationInfo migrationInfo, Consumer listener, Map metadata) { + private void updateTaskState(SystemResourceMigrationInfo migrationInfo, Consumer listener, Map metadata) { final SystemIndexMigrationTaskState newTaskState = new SystemIndexMigrationTaskState( - migrationInfo.getCurrentIndexName(), + migrationInfo.getCurrentResourceName(), migrationInfo.getFeatureName(), metadata ); @@ -309,16 +346,21 @@ private void updateTaskState(SystemIndexMigrationInfo migrationInfo, Consumer indicesMetadata) { + return indicesMetadata.anyMatch(indexMetadata -> { + assert indexMetadata != null : "null IndexMetadata should be impossible, we're not consistently using the same cluster state"; + if (indexMetadata == null) { + return false; + } + return indexMetadata.isSystem() && indexMetadata.getCreationVersion().before(NO_UPGRADE_REQUIRED_INDEX_VERSION); + }); } - private void migrateSingleIndex(ClusterState clusterState, Consumer listener) { - final SystemIndexMigrationInfo migrationInfo = currentMigrationInfo(); + private void migrateSingleIndex( + SystemIndexMigrationInfo migrationInfo, + ClusterState clusterState, + BiConsumer listener + ) { String oldIndexName = migrationInfo.getCurrentIndexName(); final ProjectMetadata projectMetadata = clusterState.metadata().getProject(); final IndexMetadata imd = projectMetadata.index(oldIndexName); @@ -375,7 +417,10 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer innerListener = ActionListener.wrap(listener::accept, this::markAsFailed); + ActionListener innerListener = ActionListener.wrap( + response -> listener.accept(migrationInfo, response), + this::markAsFailed + ); try { createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { logger.debug( @@ -479,7 +524,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener< .mapping(migrationInfo.getMappings()) .settings(Objects.requireNonNullElse(settingsBuilder.build(), Settings.EMPTY)); - baseClient.admin().indices().create(createIndexRequest, listener); + migrationInfo.createClient(baseClient).admin().indices().create(createIndexRequest, listener); } private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { @@ -507,7 +552,7 @@ private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, A private void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); String newIndexName = migrationInfo.getNextIndexName(); - baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> { + migrationInfo.createClient(baseClient).admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> { if (ackedResponse.isAcknowledged()) { logger.info("successfully removed index [{}]", newIndexName); listener.onResponse(ackedResponse); @@ -583,12 +628,191 @@ private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener completionListener + ) { + String dataStreamName = migrationInfo.getDataStreamName(); + logger.info("migrating data stream [{}] from feature [{}]", dataStreamName, migrationInfo.getFeatureName()); + + ReindexDataStreamAction.ReindexDataStreamRequest reindexRequest = new ReindexDataStreamAction.ReindexDataStreamRequest( + ReindexDataStreamAction.Mode.UPGRADE, + dataStreamName + ); + + try { + migrationInfo.createClient(baseClient) + .execute(ReindexDataStreamAction.INSTANCE, reindexRequest, ActionListener.wrap(startMigrationResponse -> { + if (startMigrationResponse.isAcknowledged() == false) { + logger.error("failed to migrate indices from data stream [{}]", dataStreamName); + throw new ElasticsearchException( + "reindex system data stream [" + + dataStreamName + + "] from feature [" + + migrationInfo.getFeatureName() + + "] response is not acknowledge" + ); + } + checkDataStreamMigrationStatus(migrationInfo, completionListener, false); + }, e -> { + if (e instanceof ResourceAlreadyExistsException) { + // This might happen if the task has been reassigned to another node, + // in this case we can just wait for the data stream migration task to finish. + // But, there is a possibility that previously started data stream migration task has failed, + // in this case we need to cancel it and restart migration of the data stream. + logger.debug("data stream [{}] migration is already in progress", dataStreamName); + checkDataStreamMigrationStatus(migrationInfo, completionListener, true); + } else { + markAsFailed(e); + } + })); + } catch (Exception ex) { + logger.error( + () -> format( + "error occurred while migrating data stream [%s] from feature [%s]", + dataStreamName, + migrationInfo.getFeatureName() + ), + ex + ); + markAsFailed(ex); + } + } + + private void checkDataStreamMigrationStatus( + SystemDataStreamMigrationInfo migrationInfo, + Consumer completionListener, + boolean restartMigrationOnError + ) { + String dataStreamName = migrationInfo.getDataStreamName(); + GetMigrationReindexStatusAction.Request getStatusRequest = new GetMigrationReindexStatusAction.Request(dataStreamName); + + migrationInfo.createClient(baseClient) + .execute(GetMigrationReindexStatusAction.INSTANCE, getStatusRequest, ActionListener.wrap(migrationStatusResponse -> { + ReindexDataStreamEnrichedStatus status = migrationStatusResponse.getEnrichedStatus(); + logger.debug( + "data stream [{}] reindexing status: pending {} out of {} indices", + dataStreamName, + status.pending(), + status.totalIndicesToBeUpgraded() + ); + + if (status.complete() == false) { + // data stream migration task is running, schedule another check without need to cancel-restart + threadPool.schedule( + () -> checkDataStreamMigrationStatus(migrationInfo, completionListener, false), + TimeValue.timeValueSeconds(1), + threadPool.generic() + ); + } else { + List> errors = status.errors(); + if (errors != null && errors.isEmpty() == false || status.exception() != null) { + + // data stream migration task existed before this task started it and is in failed state - cancel it and restart + if (restartMigrationOnError) { + cancelExistingDataStreamMigrationAndRetry(migrationInfo, completionListener); + } else { + List exceptions = (status.exception() != null) + ? Collections.singletonList(status.exception()) + : errors.stream().map(Tuple::v2).toList(); + dataStreamMigrationFailed(migrationInfo, exceptions); + } + } else { + logger.info( + "successfully migrated old indices from data stream [{}] from feature [{}] to new indices", + dataStreamName, + migrationInfo.getFeatureName() + ); + completionListener.accept(migrationInfo); + } + } + }, ex -> cancelExistingDataStreamMigrationAndMarkAsFailed(migrationInfo, ex))); + } + + private void dataStreamMigrationFailed(SystemDataStreamMigrationInfo migrationInfo, Collection exceptions) { + logger.error( + "error occurred while reindexing data stream [{}] from feature [{}], failures [{}]", + migrationInfo.getDataStreamName(), + migrationInfo.getFeatureName(), + exceptions + ); + + ElasticsearchException ex = new ElasticsearchException( + "error occurred while reindexing data stream [" + migrationInfo.getDataStreamName() + "]" + ); + for (Exception exception : exceptions) { + ex.addSuppressed(exception); + } + + throw ex; + } + // Failure handlers private void removeReadOnlyBlockOnReindexFailure(Index index, ActionListener listener, Exception ex) { logger.info("removing read only block on [{}] because reindex failed [{}]", index, ex); setWriteBlock(index, false, ActionListener.wrap(unsetReadOnlyResponse -> listener.onFailure(ex), e1 -> listener.onFailure(ex))); } + private void cancelExistingDataStreamMigrationAndRetry( + SystemDataStreamMigrationInfo migrationInfo, + Consumer completionListener + ) { + logger.debug( + "cancelling migration of data stream [{}] from feature [{}] for retry", + migrationInfo.getDataStreamName(), + migrationInfo.getFeatureName() + ); + + ActionListener listener = ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + migrateDataStream(migrationInfo, completionListener); + } else { + String dataStreamName = migrationInfo.getDataStreamName(); + logger.error( + "failed to cancel migration of data stream [{}] from feature [{}] during retry", + dataStreamName, + migrationInfo.getFeatureName() + ); + throw new ElasticsearchException( + "failed to cancel migration of data stream [" + + dataStreamName + + "] from feature [" + + migrationInfo.getFeatureName() + + "] response is not acknowledge" + ); + } + }, this::markAsFailed); + + cancelDataStreamMigration(migrationInfo, listener); + } + + private void cancelExistingDataStreamMigrationAndMarkAsFailed(SystemDataStreamMigrationInfo migrationInfo, Exception exception) { + logger.info( + "cancelling migration of data stream [{}] from feature [{}]", + migrationInfo.getDataStreamName(), + migrationInfo.getFeatureName() + ); + + // we don't really care here if the request wasn't acknowledged + ActionListener listener = ActionListener.wrap(response -> markAsFailed(exception), ex -> { + exception.addSuppressed(ex); + markAsFailed(exception); + }); + + cancelDataStreamMigration(migrationInfo, listener); + } + + private void cancelDataStreamMigration(SystemDataStreamMigrationInfo migrationInfo, ActionListener listener) { + String dataStreamName = migrationInfo.getDataStreamName(); + + CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(dataStreamName); + try { + migrationInfo.createClient(baseClient).execute(CancelReindexDataStreamAction.INSTANCE, cancelRequest, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + private static ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) { String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null) ? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures()) @@ -611,12 +835,16 @@ private static ElasticsearchException logAndThrowExceptionForFailures(BulkByScro */ @Override public void markAsFailed(Exception e) { - SystemIndexMigrationInfo migrationInfo = currentMigrationInfo(); + SystemResourceMigrationInfo migrationInfo = currentMigrationInfo(); synchronized (migrationQueue) { migrationQueue.clear(); } - String featureName = Optional.ofNullable(migrationInfo).map(SystemIndexMigrationInfo::getFeatureName).orElse(""); - String indexName = Optional.ofNullable(migrationInfo).map(SystemIndexMigrationInfo::getCurrentIndexName).orElse(""); + String featureName = Optional.ofNullable(migrationInfo) + .map(SystemResourceMigrationInfo::getFeatureName) + .orElse(""); + String indexName = Optional.ofNullable(migrationInfo) + .map(SystemResourceMigrationInfo::getCurrentResourceName) + .orElse(""); MigrationResultsUpdateTask.upsert( featureName, @@ -666,7 +894,7 @@ private static void submitUnbatchedTask( clusterService.submitUnbatchedStateUpdateTask(source, task); } - private SystemIndexMigrationInfo currentMigrationInfo() { + private SystemResourceMigrationInfo currentMigrationInfo() { synchronized (migrationQueue) { return migrationQueue.peek(); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationFactory.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationFactory.java new file mode 100644 index 0000000000000..f571098979d67 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.system_indices.task; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.indices.SystemIndices; + +import java.util.Objects; +import java.util.stream.Stream; + +class SystemResourceMigrationFactory { + /** + * Convenience factory method holding the logic for creating instances from a Feature object. + * @param feature The feature that is being migrated + * @param metadata The current metadata, as index migration depends on the current state of the cluster. + * @param indexScopedSettings This is necessary to make adjustments to the indices settings for unmanaged indices. + * @return A {@link Stream} of {@link SystemIndexMigrationInfo}s that represent all the indices the given feature currently owns. + */ + static Stream fromFeature( + SystemIndices.Feature feature, + ProjectMetadata metadata, + IndexScopedSettings indexScopedSettings + ) { + return Stream.concat( + getSystemIndicesMigrationInfos(feature, metadata, indexScopedSettings), + getSystemDataStreamsMigrationInfos(feature, metadata) + ); + } + + private static Stream getSystemIndicesMigrationInfos( + SystemIndices.Feature feature, + ProjectMetadata metadata, + IndexScopedSettings indexScopedSettings + ) { + return feature.getIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(metadata).stream().map(metadata::index).filter(imd -> { + assert imd != null + : "got null IndexMetadata for index in system descriptor [" + + descriptor.getIndexPattern() + + "] in feature [" + + feature.getName() + + "]"; + return Objects.nonNull(imd); + }).map(imd -> SystemIndexMigrationInfo.build(imd, descriptor, feature, indexScopedSettings))); + } + + private static Stream getSystemDataStreamsMigrationInfos( + SystemIndices.Feature feature, + ProjectMetadata metadata + ) { + return feature.getDataStreamDescriptors().stream().map(descriptor -> { + DataStream dataStream = metadata.dataStreams().get(descriptor.getDataStreamName()); + return dataStream != null ? SystemDataStreamMigrationInfo.build(dataStream, descriptor, feature) : null; + }).filter(Objects::nonNull); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java new file mode 100644 index 0000000000000..366a463b878ba --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemResourceMigrationInfo.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.system_indices.task; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.plugins.SystemIndexPlugin; + +import java.util.Comparator; +import java.util.Map; +import java.util.stream.Stream; + +abstract sealed class SystemResourceMigrationInfo implements Comparable permits SystemDataStreamMigrationInfo, + SystemIndexMigrationInfo { + private static final Comparator SAME_CLASS_COMPARATOR = Comparator.comparing( + SystemResourceMigrationInfo::getFeatureName + ).thenComparing(SystemResourceMigrationInfo::getCurrentResourceName); + + protected final String featureName; + protected final String origin; + protected final SystemIndices.Feature owningFeature; + + SystemResourceMigrationInfo(String featureName, String origin, SystemIndices.Feature owningFeature) { + this.featureName = featureName; + this.origin = origin; + this.owningFeature = owningFeature; + } + + protected abstract String getCurrentResourceName(); + + /** + * Gets the name of the feature which owns the index to be migrated. + */ + String getFeatureName() { + return featureName; + } + + /** + * Gets the origin that should be used when interacting with this index. + */ + String getOrigin() { + return origin; + } + + /** + * Creates a client that's been configured to be able to properly access the system index to be migrated. + * + * @param baseClient The base client to wrap. + * @return An {@link OriginSettingClient} which uses the origin provided by {@link SystemIndexMigrationInfo#getOrigin()}. + */ + Client createClient(Client baseClient) { + return new OriginSettingClient(baseClient, this.getOrigin()); + } + + abstract Stream getIndices(ProjectMetadata metadata); + + @Override + public int compareTo(SystemResourceMigrationInfo o) { + return SAME_CLASS_COMPARATOR.compare(this, o); + } + + abstract boolean isCurrentIndexClosed(); + + /** + * Invokes the pre-migration hook for the feature that owns this index. + * See {@link SystemIndexPlugin#prepareForIndicesMigration(ClusterService, Client, ActionListener)}. + * @param clusterService For retrieving the state. + * @param client For performing any update operations necessary to prepare for the upgrade. + * @param listener Call {@link ActionListener#onResponse(Object)} when preparation for migration is complete. + */ + void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener> listener) { + owningFeature.getPreMigrationFunction().prepareForIndicesMigration(clusterService, client, listener); + } + + /** + * Invokes the post-migration hooks for the feature that owns this index. + * See {@link SystemIndexPlugin#indicesMigrationComplete(Map, ClusterService, Client, ActionListener)}. + * @param metadata The metadata that was passed into the listener by the pre-migration hook. + * @param clusterService For retrieving the state. + * @param client For performing any update operations necessary to prepare for the upgrade. + * @param listener Call {@link ActionListener#onResponse(Object)} when the hook is finished. + */ + void indicesMigrationComplete( + Map metadata, + ClusterService clusterService, + Client client, + ActionListener listener + ) { + owningFeature.getPostMigrationFunction().indicesMigrationComplete(metadata, clusterService, client, listener); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index 23230b7dc9c1a..6d5172756f55b 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -183,7 +183,13 @@ public List> getPersistentTasksExecutor( IndexNameExpressionResolver expressionResolver ) { return List.of( - new SystemIndexMigrationExecutor(client, clusterService, systemIndices.get(), settingsModule.getIndexScopedSettings()), + new SystemIndexMigrationExecutor( + client, + clusterService, + systemIndices.get(), + settingsModule.getIndexScopedSettings(), + threadPool + ), new ReindexDataStreamPersistentTaskExecutor( new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN), clusterService, diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 189bdc1012790..ada10ad83fa63 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -154,7 +154,7 @@ protected void doExecute( Settings settingsBefore = sourceIndex.getSettings(); - var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(project.metadata(), false); + var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(project.metadata(), false, true); if (hasOldVersion.test(sourceIndex.getIndex()) == false) { logger.warn( "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]", diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index e9bc16af7c835..cbeb31a6b89e6 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -76,7 +76,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList int totalIndices = dataStream.getIndices().size(); int totalIndicesToBeUpgraded = (int) dataStream.getIndices() .stream() - .filter(getReindexRequiredPredicate(metadata.getProject(), false)) + .filter(getReindexRequiredPredicate(metadata.getProject(), false, dataStream.isSystem())) .count(); ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( sourceDataStreamName, diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 2fc9e2afbc02d..0949f1084355b 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -124,7 +124,10 @@ protected void nodeOperation( List dataStreamInfos = response.getDataStreams(); if (dataStreamInfos.size() == 1) { DataStream dataStream = dataStreamInfos.getFirst().getDataStream(); - if (getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false).test(dataStream.getWriteIndex())) { + boolean includeSystem = dataStream.isSystem(); + if (getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, includeSystem).test( + dataStream.getWriteIndex() + )) { RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null); rolloverRequest.setParentTask(taskId); client.execute( @@ -171,7 +174,7 @@ private void reindexIndices( ) { List indices = dataStream.getIndices(); List indicesToBeReindexed = indices.stream() - .filter(getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false)) + .filter(getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, dataStream.isSystem())) .toList(); final ReindexDataStreamPersistentTaskState updatedState; if (params.totalIndices() != totalIndicesInDataStream diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java index 28aa0bdfae46a..355ce90a14c80 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/action/TransportGetFeatureUpgradeStatusActionTests.java @@ -8,17 +8,24 @@ package org.elasticsearch.system_indices.action; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.indices.ExecutorNames; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.system_indices.task.FeatureMigrationResults; +import org.elasticsearch.system_indices.task.SingleFeatureMigrationResult; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -28,10 +35,15 @@ public class TransportGetFeatureUpgradeStatusActionTests extends ESTestCase { - public static String TEST_SYSTEM_INDEX_PATTERN = ".test*"; + private static final String DATA_STREAM_NAME = ".test-ds"; + private static final String BACKING_INDEX_NAME = DataStream.BACKING_INDEX_PREFIX + DATA_STREAM_NAME + "-1"; + private static final String FEATURE_NAME = "test-feature"; + private static String TEST_SYSTEM_INDEX_PATTERN = ".test*"; // Version just before MINIMUM_COMPATIBLE in order to check that UpgradeStatus.MIGRATION_NEEDED is set correctly private static final IndexVersion TEST_OLD_VERSION = IndexVersion.fromId(IndexVersions.MINIMUM_COMPATIBLE.id() - 1); private static final ClusterState CLUSTER_STATE = getClusterState(); + private static final String TEST_INDEX_1_NAME = ".test-index-1"; + private static final SystemIndices.Feature FEATURE = getFeature(); public void testGetFeatureStatus() { @@ -41,9 +53,9 @@ public void testGetFeatureStatus() { ); assertThat(status.getUpgradeStatus(), equalTo(MIGRATION_NEEDED)); - assertThat(status.getFeatureName(), equalTo("test-feature")); + assertThat(status.getFeatureName(), equalTo(FEATURE_NAME)); assertThat(status.getMinimumIndexVersion(), equalTo(TEST_OLD_VERSION)); - assertThat(status.getIndexVersions(), hasSize(2)); // additional testing below + assertThat(status.getIndexVersions(), hasSize(3)); // additional testing below } public void testGetIndexInfos() { @@ -52,47 +64,148 @@ public void testGetIndexInfos() { FEATURE ); - assertThat(versions, hasSize(2)); + assertThat(versions, hasSize(3)); + + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(0); + assertThat(version.getVersion(), equalTo(IndexVersion.current())); + assertThat(version.getIndexName(), equalTo(TEST_INDEX_1_NAME)); + } + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(1); + assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); + assertThat(version.getIndexName(), equalTo(".test-index-2")); + } + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(2); + assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); + assertThat(version.getIndexName(), equalTo(BACKING_INDEX_NAME)); + } + } + + public void testGetIndexInfosWithErrors() { + List versions = TransportGetFeatureUpgradeStatusAction.getIndexInfos( + getClusterStateWithFailedMigration(TEST_INDEX_1_NAME), + FEATURE + ); + + assertThat(versions, hasSize(3)); + + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(0); + assertThat(version.getVersion(), equalTo(IndexVersion.current())); + assertThat(version.getIndexName(), equalTo(TEST_INDEX_1_NAME)); + assertNotNull(version.getException()); + } + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(1); + assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); + assertThat(version.getIndexName(), equalTo(".test-index-2")); + assertNull(version.getException()); + } + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(2); + assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); + assertThat(version.getIndexName(), equalTo(BACKING_INDEX_NAME)); + assertNull(version.getException()); + } + } + + public void testGetIndexInfosWithDataStreamErrors() { + List versions = TransportGetFeatureUpgradeStatusAction.getIndexInfos( + getClusterStateWithFailedMigration(DATA_STREAM_NAME), + FEATURE + ); + + assertThat(versions, hasSize(3)); { GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(0); assertThat(version.getVersion(), equalTo(IndexVersion.current())); - assertThat(version.getIndexName(), equalTo(".test-index-1")); + assertThat(version.getIndexName(), equalTo(TEST_INDEX_1_NAME)); + assertNull(version.getException()); } { GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(1); assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); assertThat(version.getIndexName(), equalTo(".test-index-2")); + assertNull(version.getException()); + } + { + GetFeatureUpgradeStatusResponse.IndexInfo version = versions.get(2); + assertThat(version.getVersion(), equalTo(TEST_OLD_VERSION)); + assertThat(version.getIndexName(), equalTo(BACKING_INDEX_NAME)); + assertNotNull(version.getException()); } } private static SystemIndices.Feature getFeature() { SystemIndexDescriptor descriptor = SystemIndexDescriptorUtils.createUnmanaged(TEST_SYSTEM_INDEX_PATTERN, "descriptor for tests"); + SystemDataStreamDescriptor dataStreamDescriptor = new SystemDataStreamDescriptor( + DATA_STREAM_NAME, + "test data stream", + SystemDataStreamDescriptor.Type.INTERNAL, + ComposableIndexTemplate.builder().build(), + Map.of(), + Collections.singletonList("origin"), + "origin", + ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS + ); List descriptors = new ArrayList<>(); descriptors.add(descriptor); // system indices feature object - SystemIndices.Feature feature = new SystemIndices.Feature("test-feature", "feature for tests", descriptors); + SystemIndices.Feature feature = new SystemIndices.Feature( + FEATURE_NAME, + "feature for tests", + List.of(descriptor), + List.of(dataStreamDescriptor) + ); return feature; } private static ClusterState getClusterState() { - IndexMetadata indexMetadata1 = IndexMetadata.builder(".test-index-1") + IndexMetadata indexMetadata1 = IndexMetadata.builder(TEST_INDEX_1_NAME) .settings(Settings.builder().put("index.version.created", IndexVersion.current()).build()) .numberOfShards(1) .numberOfReplicas(0) + .system(true) .build(); IndexMetadata indexMetadata2 = IndexMetadata.builder(".test-index-2") .settings(Settings.builder().put("index.version.created", TEST_OLD_VERSION).build()) .numberOfShards(1) .numberOfReplicas(0) + .system(true) + .build(); + IndexMetadata dsIndexMetadata = IndexMetadata.builder(BACKING_INDEX_NAME) + .settings(Settings.builder().put("index.version.created", TEST_OLD_VERSION).build()) + .numberOfShards(1) + .numberOfReplicas(0) + .system(true) + .build(); + + DataStream dataStream = DataStream.builder(DATA_STREAM_NAME, List.of(dsIndexMetadata.getIndex())) + .setSystem(true) + .setHidden(true) .build(); ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE).metadata( - new Metadata.Builder().indices(Map.of(".test-index-1", indexMetadata1, ".test-index-2", indexMetadata2)).build() + new Metadata.Builder().dataStreams(Map.of(DATA_STREAM_NAME, dataStream), Collections.emptyMap()) + .indices(Map.of(TEST_INDEX_1_NAME, indexMetadata1, ".test-index-2", indexMetadata2, BACKING_INDEX_NAME, dsIndexMetadata)) + .build() ).build(); return clusterState; } + + private static ClusterState getClusterStateWithFailedMigration(String failedIndexName) { + SingleFeatureMigrationResult migrationResult = SingleFeatureMigrationResult.failure(failedIndexName, new Exception()); + FeatureMigrationResults featureMigrationResults = new FeatureMigrationResults(Map.of(FEATURE_NAME, migrationResult)); + + ClusterState initialState = getClusterState(); + return ClusterState.builder(initialState) + .metadata(Metadata.builder(initialState.metadata()).putCustom(FeatureMigrationResults.TYPE, featureMigrationResults).build()) + .build(); + } } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java index 362819ac11544..bea8ebf606a25 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/system_indices/task/SystemIndexMigrationMetadataTests.java @@ -124,7 +124,8 @@ private Metadata fromJsonXContentStringWithPersistentTasks(String json) throws I mock(Client.class), clusterService, mock(SystemIndices.class), - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + mock(ThreadPool.class) ); new PersistentTasksExecutorRegistry(List.of(healthNodeTaskExecutor, systemIndexMigrationExecutor)); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index 449aa8a6dff75..2bc129cb75b5b 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -444,6 +444,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), Collections.singletonList("test"), + "test", new ExecutorNames( ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java index b736e5680c7e0..e0131bcfb089a 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java @@ -274,6 +274,7 @@ public Collection getSystemDataStreamDescriptors() { .build(), Map.of(), Collections.singletonList("test"), + "test", new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE) ) ); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SystemIndicesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SystemIndicesUpgradeIT.java new file mode 100644 index 0000000000000..23fbc41df47c4 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SystemIndicesUpgradeIT.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.test.SecuritySettingsSourceField; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class SystemIndicesUpgradeIT extends AbstractUpgradeTestCase { + private static final Logger log = LogManager.getLogger(SystemIndicesUpgradeIT.class); + private static final String BASIC_AUTH_VALUE = basicAuthHeaderValue( + "test_user", + new SecureString(SecuritySettingsSourceField.TEST_PASSWORD) + ); + + @Override + protected Settings restAdminSettings() { + // Note that we are both superuser here and provide a product origin + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE) + .put(ThreadContext.PREFIX + "." + Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, "fleet") + .build(); + } + + public void testUpgradeSystemIndexAndDataStream() throws Exception { + String dataStreamName = ".fleet-actions-results"; + String indexName = ".fleet-actions"; + if (CLUSTER_TYPE == AbstractUpgradeTestCase.ClusterType.OLD) { + addDataTo(dataStreamName); + addDataTo(indexName); + verifyDataStream(dataStreamName); + verifyAccessToIndex(dataStreamName); + verifyAccessToIndex(indexName); + } else if (CLUSTER_TYPE == AbstractUpgradeTestCase.ClusterType.UPGRADED) { + upgradeSystemIndices(); + verifyDataStream(dataStreamName); + verifyIndex(indexName); + verifyAccessToIndex(dataStreamName); + verifyAccessToIndex(indexName); + } + } + + private void verifyDataStream(String dataStreamName) throws IOException { + Map>> metadata = getMetadata(dataStreamName); + assertThat(getProperty(metadata, List.of("data_stream", "data_stream", dataStreamName, "system")), equalTo("true")); + + Map> upgradedIndicesMetadata = metadata.get("indices"); + for (Map.Entry> indexEntry : upgradedIndicesMetadata.entrySet()) { + Map indexProperties = indexEntry.getValue(); + verifySystemIndexProperties(indexProperties); + } + } + + private static void verifyAccessToIndex(String aliasOrDataStreamName) throws IOException { + Request fleetCountRequest = new Request("GET", aliasOrDataStreamName + "/_count"); + Response fleetCountResponse = adminClient().performRequest(fleetCountRequest); + assertOK(fleetCountResponse); + assertThat( + XContentHelper.convertToMap(JsonXContent.jsonXContent, fleetCountResponse.getEntity().getContent(), false).get("count"), + equalTo(1) + ); + } + + private void addDataTo(String indexName) throws IOException { + Request request = new Request("POST", indexName + "/_doc"); + request.addParameter("refresh", "true"); + request.setJsonEntity("{\"@timestamp\": 0}"); + assertOK(adminClient().performRequest(request)); + } + + private void verifyIndex(String indexName) throws IOException { + Map> indexMetadata = getIndexMetadata(indexName); + assertThat(indexMetadata, aMapWithSize(1)); + Map indexProperties = indexMetadata.values().iterator().next(); + verifySystemIndexProperties(indexProperties); + } + + private static void verifySystemIndexProperties(Map indexProperties) { + assertThat(getProperty(indexProperties, List.of("system")), equalTo("true")); + } + + @SuppressWarnings("unchecked") + private static String getProperty(Map properties, List propertyPath) { + for (int i = 0; i < propertyPath.size() - 1; i++) { + Object o = properties.get(propertyPath.get(i)); + assertThat(o, instanceOf(Map.class)); + properties = (Map) o; + } + return String.valueOf(properties.get(propertyPath.get(propertyPath.size() - 1))); + } + + private void upgradeSystemIndices() throws Exception { + String upgradeUser = "upgrade_user"; + String upgradeUserPassword = "x-pack-test-password"; + createRole("upgrade_role"); + createUser(upgradeUser, upgradeUserPassword, "upgrade_role"); + + try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) { + boolean upgradeRequired = Version.fromString(UPGRADE_FROM_VERSION).before(SystemIndices.NO_UPGRADE_REQUIRED_VERSION); + String expectedStatus = (upgradeRequired) ? "MIGRATION_NEEDED" : "NO_MIGRATION_NEEDED"; + + assertThat( + XContentHelper.convertToMap( + JsonXContent.jsonXContent, + upgradeUserClient.performRequest(new Request("GET", "/_migration/system_features")).getEntity().getContent(), + false + ).get("migration_status"), + equalTo(expectedStatus) + ); + + if (upgradeRequired) { + Request upgradeRequest = new Request("POST", "/_migration/system_features"); + Response upgradeResponse = upgradeUserClient.performRequest(upgradeRequest); + assertOK(upgradeResponse); + assertBusy(() -> { + Response featureResponse = upgradeUserClient.performRequest(new Request("GET", "/_migration/system_features")); + assertThat( + XContentHelper.convertToMap(JsonXContent.jsonXContent, featureResponse.getEntity().getContent(), false) + .get("migration_status"), + equalTo("NO_MIGRATION_NEEDED") + ); + }, 30, TimeUnit.SECONDS); + } + } + } + + private void createUser(String name, String password, String role) throws IOException { + Request request = new Request("PUT", "/_security/user/" + name); + request.setJsonEntity("{ \"password\": \"" + password + "\", \"roles\": [ \"" + role + "\"] }"); + assertOK(adminClient().performRequest(request)); + } + + private void createRole(String name) throws IOException { + Request request = new Request("PUT", "/_security/role/" + name); + request.setJsonEntity( + "{ \"cluster\": [\"cluster:admin/migration/post_system_feature\", \"cluster:admin/migration/get_system_feature\"] }" + ); + assertOK(adminClient().performRequest(request)); + } + + private RestClient getClient(String user, String passwd) throws IOException { + RestClientBuilder builder = RestClient.builder(adminClient().getNodes().toArray(new Node[0])); + String token = basicAuthHeaderValue(user, new SecureString(passwd.toCharArray())); + configureClient(builder, Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build()); + builder.setStrictDeprecationMode(true); + return builder.build(); + } + + private Map> getIndexMetadata(String aliasName) throws IOException { + Map>> metadata = getMetadata(aliasName); + return metadata.get("indices"); + } + + @SuppressWarnings("unchecked") + private static Map>> getMetadata(String dataStreamOrAlias) throws IOException { + Request getClusterStateRequest = new Request("GET", "/_cluster/state/metadata/" + dataStreamOrAlias); + Response clusterStateResponse = client().performRequest(getClusterStateRequest); + Map clusterState = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + clusterStateResponse.getEntity().getContent(), + false + ); + return (Map>>) clusterState.get("metadata"); + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml index a8501332b973a..3b6f7cc17b66a 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml @@ -7,6 +7,9 @@ setup: timeout: 70s --- "Get start, stop, upgrade and delete old cluster batch transform": + - skip: + awaits_fix: "https://github.com/elastic/ml-team/issues/1522" + reason: "Transform system index migration is broken" # Simple and complex OLD transforms - do: transform.get_transform: @@ -222,6 +225,9 @@ setup: transform_id: "old-simple-continuous-transform" --- "Test GET, mixed continuous transforms": + - skip: + awaits_fix: "https://github.com/elastic/ml-team/issues/1522" + reason: "Transform system index migration is broken" - do: transform.get_transform: transform_id: "mixed-simple-continuous-transform" @@ -286,8 +292,13 @@ setup: --- "Test index mappings for latest internal index and audit index": + - skip: + awaits_fix: "https://github.com/elastic/ml-team/issues/1522" + reason: "Transform system index migration is broken" - skip: features: warnings + - requires: + test_runner_features: warnings_regex - do: transform.put_transform: transform_id: "upgraded-simple-transform" @@ -304,8 +315,8 @@ setup: - match: { acknowledged: true } - do: - warnings: - - "this request accesses system indices: [.transform-internal-007], but in a future major version, direct access to system indices will be prevented by default" + warnings_regex: + - "this request accesses system indices: \\[\\.transform-internal-\\d{3}(?:-reindexed-for-\\d{1,2})?], but in a future major version, direct access to system indices will be prevented by default" indices.get_mapping: index: .transform-internal-007 - match: { \.transform-internal-007.mappings.dynamic: "false" }