Skip to content

[8.x] Add node feature for failure store, refactor capability names (#126885) #127091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.features.FeatureSpecification;
Expand Down Expand Up @@ -39,12 +40,22 @@ public Map<NodeFeature, Version> getHistoricalFeatures() {

@Override
public Set<NodeFeature> getFeatures() {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
);
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE // Added in 8.19
);
} else {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -643,7 +643,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down Expand Up @@ -739,7 +739,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ TSDB failures go to failure store:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ setup:
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

---
teardown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]
- method: POST
path: /{index}/_rollover
capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ]
capabilities: [ 'index_expression_selectors' ]

- do:
allowed_warnings:
Expand Down Expand Up @@ -313,7 +313,7 @@ teardown:
capabilities:
- method: POST
path: /{index}/_rollover
capabilities: [lazy-rollover-failure-store]
capabilities: [index_expression_selectors]

# Initialize failure store
- do:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
setup:
- requires:
reason: "Data stream options was added in 8.18+"
cluster_features: [ "data_stream.failure_store" ]
reason: "Failure store GA in 8.19+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- method: PUT
path: /_cluster/settings
capabilities: [ 'data_stream_failure_store_cluster_setting' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
cluster.put_settings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final boolean clusterHasFailureStoreFeature;

BulkOperation(
Task task,
Expand All @@ -113,7 +114,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
long startTimeNanos,
ActionListener<BulkResponse> listener,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
boolean clusterHasFailureStoreFeature
) {
this(
task,
Expand All @@ -130,7 +132,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
new FailureStoreDocumentConverter(),
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
clusterHasFailureStoreFeature
);
}

Expand All @@ -149,7 +152,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
ClusterStateObserver observer,
FailureStoreDocumentConverter failureStoreDocumentConverter,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
boolean clusterHasFailureStoreFeature
) {
super(listener);
this.task = task;
Expand All @@ -169,6 +173,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
this.failureStoreMetrics = failureStoreMetrics;
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
}

@Override
Expand Down Expand Up @@ -543,7 +548,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, clusterState.metadata());
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
// it has the failure store enabled.
if (failureStoreCandidate != null) {
if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
// Do not redirect documents to a failure store that were already headed to one.
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
if (isFailureStoreRequest == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ void executeBulk(
Executor executor,
AtomicArray<BulkItemResponse> responses
) {
// Determine if we have the feature enabled once for entire bulk operation
final boolean clusterSupportsFailureStore = featureService.clusterHasFeature(
clusterService.state(),
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
);
new BulkOperation(
task,
threadPool,
Expand All @@ -591,7 +596,8 @@ void executeBulk(
startTimeNanos,
listener,
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
clusterSupportsFailureStore
).run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -72,13 +73,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO

private static final Logger LOGGER = LogManager.getLogger(DataStream.class);

public static final FeatureFlag FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store");
public static final boolean FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store").isEnabled();
public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store");
public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0;

public static boolean isFailureStoreFeatureFlagEnabled() {
return FAILURE_STORE_FEATURE_FLAG.isEnabled();
return FAILURE_STORE_FEATURE_FLAG;
}

public static final String BACKING_INDEX_PREFIX = ".ds-";
Expand Down
34 changes: 32 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -125,6 +126,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private final FailureStoreMetrics failureStoreMetrics;
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;
private final FeatureService featureService;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
Expand Down Expand Up @@ -197,7 +199,8 @@ public IngestService(
Client client,
MatcherWatchdog matcherWatchdog,
DocumentParsingProvider documentParsingProvider,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
FeatureService featureService
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -220,6 +223,7 @@ public IngestService(
this.threadPool = threadPool;
this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
this.failureStoreMetrics = failureStoreMetrics;
this.featureService = featureService;
}

/**
Expand All @@ -237,6 +241,7 @@ public IngestService(
this.pipelines = ingestService.pipelines;
this.state = ingestService.state;
this.failureStoreMetrics = ingestService.failureStoreMetrics;
this.featureService = ingestService.featureService;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -785,6 +790,9 @@ public void executeBulkRequest(
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";

// Adapt handler to ensure node features during ingest logic
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);

executor.execute(new AbstractRunnable() {

@Override
Expand Down Expand Up @@ -870,7 +878,7 @@ public void onFailure(Exception e) {
}
);

executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
assert actionRequest.index() != null;

i++;
Expand All @@ -880,6 +888,28 @@ public void onFailure(Exception e) {
});
}

/**
* Adapts failure store resolver function so that if the failure store node feature is not present on every node it reverts to the
* old ingest behavior.
* @param resolveFailureStore Function that surfaces if failures for an index should be redirected to failure store.
* @return An adapted function that mutes the original if the cluster does not have the node feature universally applied.
*/
private Function<String, Boolean> wrapResolverWithFeatureCheck(Function<String, Boolean> resolveFailureStore) {
final boolean clusterHasFailureStoreFeature = featureService.clusterHasFeature(
clusterService.state(),
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
);
return (indexName) -> {
if (clusterHasFailureStoreFeature) {
return resolveFailureStore.apply(indexName);
} else {
// If we get a non-null result but the cluster is not yet fully updated with required node features,
// force the result null to maintain old logic until all nodes are updated
return null;
}
};
}

/**
* Returns the pipelines of the request, and updates the request so that it no longer references
* any pipelines (both the default and final pipeline are set to the noop pipeline).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ private void construct(

modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);

FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));

FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
final IngestService ingestService = new IngestService(
clusterService,
Expand All @@ -707,7 +709,8 @@ private void construct(
client,
IngestService.createGrokThreadWatchdog(environment, threadPool),
documentParsingProvider,
failureStoreMetrics
failureStoreMetrics,
featureService
);

SystemIndices systemIndices = createSystemIndices(settings);
Expand Down Expand Up @@ -787,8 +790,6 @@ private void construct(

final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);

FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));

if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
clusterService.addListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -34,9 +33,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
private static final String PERSISTENT = "persistent";
private static final String TRANSIENT = "transient";

// TODO: Remove this and use a single cluster feature / capability for the whole failure store feature when the feature flag is removed
private static final String DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY = "data_stream_failure_store_cluster_setting";

@Override
public List<Route> routes() {
return List.of(new Route(PUT, "/_cluster/settings"));
Expand Down Expand Up @@ -78,8 +74,4 @@ public boolean canTripCircuitBreaker() {
return false;
}

@Override
public Set<String> supportedCapabilities() {
return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY) : Set.of();
}
}
Loading