Skip to content

[8.x] [Failure store] Support failure store for system data streams (#126585) #126639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
Expand All @@ -46,12 +51,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class SystemDataStreamIT extends ESIntegTestCase {

Expand All @@ -60,6 +68,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(DataStreamsPlugin.class);
plugins.add(TestSystemDataStreamPlugin.class);
plugins.add(MapperExtrasPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -167,6 +176,63 @@ public void testDataStreamStats() throws Exception {
}
}

public void testSystemDataStreamWithFailureStore() throws Exception {
String dataStreamName = ".test-failure-store";
RequestOptions productHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build();
try (RestClient restClient = createRestClient()) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.setOptions(productHeader);
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
indexRequest.setJsonEntity(
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value)
);

Response indexResponse = restClient.performRequest(indexRequest);
assertThat(indexResponse.getStatusLine().getStatusCode(), is(201));
Map<String, Object> responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(indexResponse.getEntity()),
false
);
assertThat(responseMap.get("result"), equalTo("created"));
assertThat((String) responseMap.get("_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
assertThat(responseMap.get("failure_store"), equalTo("used"));

// Rollover
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "::failures/_rollover");
rolloverRequest.setOptions(productHeader);
Response rolloverResponse = restClient.performRequest(rolloverRequest);
assertThat(rolloverResponse.getStatusLine().getStatusCode(), is(200));
responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(rolloverResponse.getEntity()),
false
);
assertThat(responseMap.get("acknowledged"), equalTo(true));
assertThat(responseMap.get("rolled_over"), equalTo(true));
assertThat((String) responseMap.get("new_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));

// Edit data stream options
Request editOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
editOptionsRequest.setJsonEntity("{\"failure_store\":{\"enabled\":\"false\"}}");
editOptionsRequest.setOptions(productHeader);
Response editOptionsResponse = restClient.performRequest(editOptionsRequest);
assertThat(editOptionsResponse.getStatusLine().getStatusCode(), is(200));
responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(editOptionsResponse.getEntity()),
false
);
assertThat(responseMap.get("acknowledged"), equalTo(true));

// delete
Request deleteRequest = new Request("DELETE", "/_data_stream/" + dataStreamName);
deleteRequest.setOptions(productHeader);
Response deleteResponse = restClient.performRequest(deleteRequest);
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
}
}

@SuppressWarnings("unchecked")
public void testSystemDataStreamReadWrite() throws Exception {
try (RestClient restClient = createRestClient()) {
Expand Down Expand Up @@ -325,6 +391,30 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
),
new SystemDataStreamDescriptor(
".test-failure-store",
"system data stream test with failure store",
Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(".test-failure-store"))
.template(Template.builder().mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}""")).dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true))))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ private RolloverResult rolloverDataStream(
now.toEpochMilli(),
dataStreamName,
templateV2,
systemDataStreamDescriptor,
newWriteIndexName,
(builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,6 @@ static ClusterState createDataStream(
// responsibility to check that before setting.
IndexMetadata failureStoreIndex = null;
if (initializeFailureStore) {
if (isSystem) {
throw new IllegalArgumentException("Failure stores are not supported on system data streams");
}
String failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, initialGeneration, request.startTime());
currentState = createFailureStoreIndex(
metadataCreateIndexService,
Expand All @@ -282,6 +279,7 @@ static ClusterState createDataStream(
request.startTime(),
dataStreamName,
template,
systemDataStreamDescriptor,
failureStoreIndexName,
null
);
Expand Down Expand Up @@ -409,6 +407,7 @@ public static ClusterState createFailureStoreIndex(
long nameResolvedInstant,
String dataStreamName,
ComposableIndexTemplate template,
SystemDataStreamDescriptor systemDataStreamDescriptor,
String failureStoreIndexName,
@Nullable BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer
) throws Exception {
Expand All @@ -427,7 +426,8 @@ public static ClusterState createFailureStoreIndex(
.performReroute(false)
.setMatchingTemplate(template)
.settings(indexSettings)
.isFailureIndex(true);
.isFailureIndex(true)
.systemDataStreamDescriptor(systemDataStreamDescriptor);

try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/migrate/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {

internalClusterTestImplementation project(path: ':modules:lang-painless')
internalClusterTestImplementation project(path: ':modules:lang-painless:spi')
internalClusterTestImplementation project(path: ':modules:mapper-extras')
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.plugins.ActionPlugin;
Expand All @@ -33,6 +35,8 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -52,23 +56,38 @@ public class SystemDataStreamMigrationIT extends AbstractFeatureMigrationIntegTe
);

private static SystemDataStreamDescriptor createSystemDataStreamDescriptor(IndexVersion indexVersion) {
return new SystemDataStreamDescriptor(
TEST_DATA_STREAM_NAME,
"system data stream test",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.template(
Template.builder()
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
.settings(indexSettings(indexVersion, 1, 0))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
ORIGIN,
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
);
try {
return new SystemDataStreamDescriptor(
TEST_DATA_STREAM_NAME,
"system data stream test",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.template(
Template.builder()
.mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""))
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
.settings(indexSettings(indexVersion, 1, 0))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
ORIGIN,
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
Expand All @@ -87,6 +106,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(DataStreamsPlugin.class);
plugins.add(DataStreamTestPlugin.class);
plugins.add(MapperExtrasPlugin.class);
return plugins;
}

Expand All @@ -110,6 +130,20 @@ private static void indexDocsToDataStream(String dataStreamName) {

BulkResponse actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));

// Index docs to failure store too
bulkBuilder = client().prepareBulk();
for (int i = 0; i < INDEX_DOC_COUNT; i++) {
IndexRequestBuilder requestBuilder = ESIntegTestCase.prepareIndex(dataStreamName)
.setId(Integer.toString(i))
.setRequireDataStream(true)
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(DataStream.TIMESTAMP_FIELD_NAME, 1741271969000L, "count", "not-a-number");
bulkBuilder.add(requestBuilder);
}

actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
}

public void testMigrateSystemDataStream() throws Exception {
Expand All @@ -136,6 +170,16 @@ public void testMigrateSystemDataStream() throws Exception {
assertThat(indexMetadata.isSystem(), is(true));
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
}

// Migrate action does not migrate the failure store indices
// here we check that they are preserved.
List<Index> failureIndices = dataStream.getFailureIndices();
assertThat(failureIndices, hasSize(1));
for (Index failureIndex : failureIndices) {
IndexMetadata indexMetadata = finalMetadata.index(failureIndex);
assertThat(indexMetadata.isSystem(), is(true));
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
}
}

public void testMigrationRestartAfterFailure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -265,21 +267,47 @@ public static class SystemDataStreamTestPlugin extends Plugin implements SystemI

@Override
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
.template(Template.builder().lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
)
try {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
.template(
Template.builder()
.mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""))
.lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO))
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(
ThreadPool.Names.SYSTEM_CRITICAL_READ,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SYSTEM_WRITE
)
)
);
} catch (IOException e) {
fail(e.getMessage());
}
throw new IllegalStateException(
"Something went wrong, it should have either returned the descriptor or it should have thrown an assertion error"
);
}

Expand Down