Skip to content

Add ability to redirect ingestion failures on data streams to a failure store #126973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
setting 'xpack.license.self_generated.type', 'trial'
setting 'indices.lifecycle.history_index_enabled', 'false'
keystorePassword 'keystore-password'
if (buildParams.snapshotBuild == false) {
requiresFeature 'es.failure_store_feature_flag_enabled', new Version(8, 12, 0)
}
}

// debug ccr test failures:
Expand Down Expand Up @@ -126,7 +123,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {


requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.12.0")

// TODO Rene: clean up this kind of cross project file references
extraConfigFile 'op-jwks.json', project(':x-pack:test:idp-fixture').file("src/main/resources/oidc/op-jwks.json")
Expand Down
89 changes: 89 additions & 0 deletions docs/changelog/126973.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
pr: 126973
summary: Add ability to redirect ingestion failures on data streams to a failure store
area: Data streams
type: feature
issues: []
highlight:
title: Add ability to redirect ingestion failures on data streams to a failure store
body: |-
Documents that encountered ingest pipeline failures or mapping conflicts
would previously be returned to the client as errors in the bulk and
index operations. Many client applications are not equipped to respond
to these failures. This leads to the failed documents often being
dropped by the client which cannot hold the broken documents
indefinitely. In many end user workloads, these failed documents
represent events that could be critical signals for observability or
security use cases.

To help mitigate this problem, data streams can now maintain a "failure
store" which is used to accept and hold documents that fail to be
ingested due to preventable configuration errors. The data stream's
failure store operates like a separate set of backing indices with their
own mappings and access patterns that allow Elasticsearch to accept
documents that would otherwise be rejected due to unhandled ingest
pipeline exceptions or mapping conflicts.

Users can enable redirection of ingest failures to the failure store on
new data streams by specifying it in the new `data_stream_options` field
inside of a component or index template:

[source,yaml]
----
PUT _index_template/my-template
{
"index_patterns": ["logs-test-*"],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
}
}'
----

Existing data streams can be configured with the new data stream
`_options` endpoint:

[source,yaml]
----
PUT _data_stream/logs-test-apache/_options
{
"failure_store": {
"enabled": "true"
}
}
----

When redirection is enabled, any ingestion related failures will be
captured in the failure store if the cluster is able to, along with the
timestamp that the failure occurred, details about the error
encountered, and the document that could not be ingested. Since failure
stores are a kind of Elasticsearch index, we can search the data stream
for the failures that it has collected. The failures are not shown by
default as they are stored in different indices than the normal data
stream data. In order to retrieve the failures, we use the `_search` API
along with a new bit of index pattern syntax, the `::` selector.

[source,yaml]
----
POST logs-test-apache::failures/_search
----

This index syntax informs the search operation to target the indices in
its failure store instead of its backing indices. It can be mixed in a
number of ways with other index patterns to include their failure store
indices in the search operation:

[source,yaml]
----
POST logs-*::failures/_search
POST logs-*,logs-*::failures/_search
POST *::failures/_search
POST _query
{
"query": "FROM my_data_stream*::failures"
}
----
notable: true
6 changes: 0 additions & 6 deletions modules/data-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ if (buildParams.inFipsJvm){
tasks.named("yamlRestTest").configure{enabled = false }
}

if (buildParams.snapshotBuild == false) {
tasks.withType(Test).configureEach {
systemProperty 'es.failure_store_feature_flag_enabled', 'true'
}
}

tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
Expand All @@ -38,7 +37,6 @@ public abstract class AbstractDataStreamIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
// Disable apm-data so the index templates it installs do not impact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand All @@ -29,7 +28,6 @@ public class DataStreamWithSecurityIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.watcher.enabled", "false")
.setting("xpack.ml.enabled", "false")
.setting("xpack.security.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;
Expand All @@ -27,7 +26,6 @@ public abstract class DisabledSecurityDataStreamTestCase extends ESRestTestCase
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand All @@ -41,7 +40,6 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.watcher.enabled", "false")
.setting("xpack.ml.enabled", "false")
.setting("xpack.security.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand All @@ -46,7 +45,6 @@ public abstract class DataStreamLifecyclePermissionsTestCase extends ESRestTestC
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.watcher.enabled", "false")
.setting("xpack.ml.enabled", "false")
.setting("xpack.security.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DataStreamFeatures implements FeatureSpecification {

@Override
public Set<NodeFeature> getFeatures() {
return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) : Set.of();
return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -237,11 +236,9 @@ public List<ActionHandler> getActions() {
actions.add(new ActionHandler(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
actions.add(new ActionHandler(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
actions.add(new ActionHandler(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
actions.add(new ActionHandler(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
actions.add(new ActionHandler(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
}
actions.add(new ActionHandler(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
actions.add(new ActionHandler(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
return actions;
}

Expand Down Expand Up @@ -274,11 +271,9 @@ public List<RestHandler> getRestHandlers(
handlers.add(new RestDeleteDataStreamLifecycleAction());
handlers.add(new RestExplainDataStreamLifecycleAction());
handlers.add(new RestDataStreamLifecycleStatsAction());
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
handlers.add(new RestGetDataStreamOptionsAction());
handlers.add(new RestPutDataStreamOptionsAction());
handlers.add(new RestDeleteDataStreamOptionsAction());
}
handlers.add(new RestGetDataStreamOptionsAction());
handlers.add(new RestPutDataStreamOptionsAction());
handlers.add(new RestDeleteDataStreamOptionsAction());
return handlers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ static GetDataStreamAction.Response innerOperation(
for (DataStream dataStream : dataStreams) {
// For this action, we are returning whether the failure store is effectively enabled, either in metadata or by cluster setting.
// Users can use the get data stream options API to find out whether it is explicitly enabled in metadata.
boolean failureStoreEffectivelyEnabled = DataStream.isFailureStoreFeatureFlagEnabled()
&& dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
boolean failureStoreEffectivelyEnabled = dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
final String indexTemplate;
boolean indexTemplatePreferIlmValue = true;
String ilmPolicyName = null;
Expand Down Expand Up @@ -289,7 +288,7 @@ static GetDataStreamAction.Response innerOperation(
Map<Index, IndexProperties> backingIndicesSettingsValues = new HashMap<>();
ProjectMetadata metadata = state.metadata();
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getIndices());
if (DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
if (dataStream.getFailureIndices().isEmpty() == false) {
collectIndexSettingsValues(dataStream, backingIndicesSettingsValues, metadata, dataStream.getFailureIndices());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,9 @@ private void run(ProjectState projectState) {
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude them for the remaining run.
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true);
if (failureStoreWriteIndex != null) {
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
}
Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true);
if (failureStoreWriteIndex != null) {
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
}

// tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be
Expand Down Expand Up @@ -802,7 +800,7 @@ static List<Index> getTargetIndices(
targetIndices.add(index);
}
}
if (withFailureStore && DataStream.isFailureStoreFeatureFlagEnabled() && dataStream.getFailureIndices().isEmpty() == false) {
if (withFailureStore && dataStream.getFailureIndices().isEmpty() == false) {
for (Index index : dataStream.getFailureIndices()) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
&& indicesToExcludeForRemainingRun.contains(index) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assume;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -69,8 +68,6 @@ public void testDeleteDataStream() {
}

public void testDeleteDataStreamWithFailureStore() {
Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled());

final String dataStreamName = "my-data-stream";
final List<String> otherIndices = randomSubsetOf(List.of("foo", "bar", "baz"));

Expand Down
Loading
Loading