Skip to content

[8.x] [Failure store] Extend DLM security test to cover failure store (#126543) #126659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -95,31 +95,35 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {

public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
// empty lifecycle contains the default rollover
prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.Template.DATA_DEFAULT);
// with failure store and empty lifecycle contains the default rollover
prepareDataStreamAndIndex(dataStreamName, null);

assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String backingIndex = backingIndices.get(0);
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));

// initialise the failure store
indexFailedDoc(dataStreamName);
List<String> failureIndices = waitForDataStreamIndices(dataStreamName, 2, true);
String firstFailureIndex = failureIndices.get(0);
assertThat(firstFailureIndex, dataStreamIndexEqualTo(dataStreamName, 3, true));
String secondFailureIndexGen = failureIndices.get(1);
assertThat(secondFailureIndexGen, dataStreamIndexEqualTo(dataStreamName, 4, true));

assertNoAuthzErrors();
// Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under
// the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called
indexDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(3));
});

waitForDataStreamBackingIndices(dataStreamName, 3);
assertNoAuthzErrors();
}

public void testRolloverAndRetentionAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).buildTemplate());
prepareDataStreamAndIndex(dataStreamName, TimeValue.ZERO);

assertBusy(() -> {
assertNoAuthzErrors();
Expand All @@ -130,16 +134,33 @@ public void testRolloverAndRetentionAuthorized() throws Exception {
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

// test failure store too, we index the failure later to have predictable generation suffixes
indexFailedDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<String> failureIndices = getDataStreamBackingIndexNames(dataStreamName, true);
assertThat(failureIndices.size(), equalTo(1));
// we expect the data stream to have only one failure index, with generation 4
// as generation 3 would've been deleted by the data stream lifecycle given the lifecycle configuration
String writeIndex = failureIndices.get(0);
assertThat(writeIndex, dataStreamIndexEqualTo(dataStreamName, 4, true));
});
}

public void testUnauthorized() throws Exception {
// this is an example index pattern for a system index that the data stream lifecycle does not have access for. Data stream
// lifecycle will therefore fail at runtime with an authz exception
prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, DataStreamLifecycle.Template.DATA_DEFAULT);
prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, null);
indexFailedDoc(SECURITY_MAIN_ALIAS);

assertBusy(() -> {
Map<String, String> indicesAndErrors = collectErrorsFromStoreAsMap();
assertThat(indicesAndErrors, is(not(anEmptyMap())));
// Both the backing and failures indices should have errors
assertThat(indicesAndErrors.size(), is(2));
for (String index : indicesAndErrors.keySet()) {
assertThat(index, anyOf(containsString(DataStream.BACKING_INDEX_PREFIX), containsString(DataStream.FAILURE_STORE_PREFIX)));
}
assertThat(
indicesAndErrors.values(),
hasItem(allOf(containsString("security_exception"), containsString("unauthorized for user [_data_stream_lifecycle]")))
Expand All @@ -160,6 +181,18 @@ public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exce
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

// test failure store too, we index the failure later to have predictable generation suffixes
indexFailedDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<String> failureIndices = getDataStreamBackingIndexNames(dataStreamName, true);
assertThat(failureIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the lifecycle configuration
String writeIndex = failureIndices.get(0);
assertThat(writeIndex, dataStreamIndexEqualTo(dataStreamName, 4, true));
});
}

private static String randomDataStreamName() {
Expand All @@ -183,9 +216,22 @@ private Map<String, String> collectErrorsFromStoreAsMap() {
return indicesAndErrors;
}

private void prepareDataStreamAndIndex(String dataStreamName, DataStreamLifecycle.Template lifecycle) throws IOException,
InterruptedException, ExecutionException {
putComposableIndexTemplate("id1", null, List.of(dataStreamName + "*"), null, null, lifecycle);
private void prepareDataStreamAndIndex(String dataStreamName, TimeValue retention) throws IOException, InterruptedException,
ExecutionException {
var dataLifecycle = retention == null
? DataStreamLifecycle.Template.DATA_DEFAULT
: new DataStreamLifecycle.Template(true, retention, null);
putComposableIndexTemplate("id1", """
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}""", List.of(dataStreamName + "*"), null, null, dataLifecycle);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
Expand Down Expand Up @@ -224,7 +270,7 @@ private static void putComposableIndexTemplate(
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataStreamLifecycle.Template lifecycle
@Nullable DataStreamLifecycle.Template dataLifecycle
) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
Expand All @@ -234,7 +280,8 @@ private static void putComposableIndexTemplate(
Template.builder()
.settings(settings)
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.lifecycle(lifecycle)
.lifecycle(dataLifecycle)
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
Expand All @@ -261,6 +308,27 @@ private static void indexDoc(String dataStream) {
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

private static void indexFailedDoc(String dataStream) {
BulkRequest bulkRequest = new BulkRequest();
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value),
XContentType.JSON
)
);
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(1));
String backingIndexPrefix = DataStream.FAILURE_STORE_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {

static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";
Expand Down