diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java index 0b120eb8e39a5..29379ea5ed95b 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java @@ -55,10 +55,10 @@ import java.util.concurrent.ExecutionException; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -95,31 +95,35 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception { String dataStreamName = randomDataStreamName(); - // empty lifecycle contains the default rollover - prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.Template.DATA_DEFAULT); + // with failure store and empty lifecycle contains the default rollover + prepareDataStreamAndIndex(dataStreamName, null); - assertBusy(() -> { - assertNoAuthzErrors(); - List backingIndices = getDataStreamBackingIndices(dataStreamName); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String backingIndex = backingIndices.get(0); + assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); + String writeIndex = backingIndices.get(1); + assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); + + // initialise the failure store + indexFailedDoc(dataStreamName); + List failureIndices = waitForDataStreamIndices(dataStreamName, 2, true); + String firstFailureIndex = failureIndices.get(0); + assertThat(firstFailureIndex, dataStreamIndexEqualTo(dataStreamName, 3, true)); + String secondFailureIndexGen = failureIndices.get(1); + assertThat(secondFailureIndexGen, dataStreamIndexEqualTo(dataStreamName, 4, true)); + + assertNoAuthzErrors(); // Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under // the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called indexDoc(dataStreamName); - assertBusy(() -> { - assertNoAuthzErrors(); - List backingIndices = getDataStreamBackingIndices(dataStreamName); - assertThat(backingIndices.size(), equalTo(3)); - }); + + waitForDataStreamBackingIndices(dataStreamName, 3); + assertNoAuthzErrors(); } public void testRolloverAndRetentionAuthorized() throws Exception { String dataStreamName = randomDataStreamName(); - prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).buildTemplate()); + prepareDataStreamAndIndex(dataStreamName, TimeValue.ZERO); assertBusy(() -> { assertNoAuthzErrors(); @@ -130,16 +134,33 @@ public void testRolloverAndRetentionAuthorized() throws Exception { String writeIndex = backingIndices.get(0).getName(); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); }); + + // test failure store too, we index the failure later to have predictable generation suffixes + indexFailedDoc(dataStreamName); + assertBusy(() -> { + assertNoAuthzErrors(); + List failureIndices = getDataStreamBackingIndexNames(dataStreamName, true); + assertThat(failureIndices.size(), equalTo(1)); + // we expect the data stream to have only one failure index, with generation 4 + // as generation 3 would've been deleted by the data stream lifecycle given the lifecycle configuration + String writeIndex = failureIndices.get(0); + assertThat(writeIndex, dataStreamIndexEqualTo(dataStreamName, 4, true)); + }); } public void testUnauthorized() throws Exception { // this is an example index pattern for a system index that the data stream lifecycle does not have access for. Data stream // lifecycle will therefore fail at runtime with an authz exception - prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, DataStreamLifecycle.Template.DATA_DEFAULT); + prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, null); + indexFailedDoc(SECURITY_MAIN_ALIAS); assertBusy(() -> { Map indicesAndErrors = collectErrorsFromStoreAsMap(); - assertThat(indicesAndErrors, is(not(anEmptyMap()))); + // Both the backing and failures indices should have errors + assertThat(indicesAndErrors.size(), is(2)); + for (String index : indicesAndErrors.keySet()) { + assertThat(index, anyOf(containsString(DataStream.BACKING_INDEX_PREFIX), containsString(DataStream.FAILURE_STORE_PREFIX))); + } assertThat( indicesAndErrors.values(), hasItem(allOf(containsString("security_exception"), containsString("unauthorized for user [_data_stream_lifecycle]"))) @@ -160,6 +181,18 @@ public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exce String writeIndex = backingIndices.get(0).getName(); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); }); + + // test failure store too, we index the failure later to have predictable generation suffixes + indexFailedDoc(dataStreamName); + assertBusy(() -> { + assertNoAuthzErrors(); + List failureIndices = getDataStreamBackingIndexNames(dataStreamName, true); + assertThat(failureIndices.size(), equalTo(1)); + // we expect the data stream to have only one backing index, the write one, with generation 2 + // as generation 1 would've been deleted by the data stream lifecycle given the lifecycle configuration + String writeIndex = failureIndices.get(0); + assertThat(writeIndex, dataStreamIndexEqualTo(dataStreamName, 4, true)); + }); } private static String randomDataStreamName() { @@ -183,9 +216,22 @@ private Map collectErrorsFromStoreAsMap() { return indicesAndErrors; } - private void prepareDataStreamAndIndex(String dataStreamName, DataStreamLifecycle.Template lifecycle) throws IOException, - InterruptedException, ExecutionException { - putComposableIndexTemplate("id1", null, List.of(dataStreamName + "*"), null, null, lifecycle); + private void prepareDataStreamAndIndex(String dataStreamName, TimeValue retention) throws IOException, InterruptedException, + ExecutionException { + var dataLifecycle = retention == null + ? DataStreamLifecycle.Template.DATA_DEFAULT + : new DataStreamLifecycle.Template(true, retention, null); + putComposableIndexTemplate("id1", """ + { + "properties": { + "@timestamp" : { + "type": "date" + }, + "count": { + "type": "long" + } + } + }""", List.of(dataStreamName + "*"), null, null, dataLifecycle); CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, @@ -224,7 +270,7 @@ private static void putComposableIndexTemplate( List patterns, @Nullable Settings settings, @Nullable Map metadata, - @Nullable DataStreamLifecycle.Template lifecycle + @Nullable DataStreamLifecycle.Template dataLifecycle ) throws IOException { TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); request.indexTemplate( @@ -234,7 +280,8 @@ private static void putComposableIndexTemplate( Template.builder() .settings(settings) .mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings)) - .lifecycle(lifecycle) + .lifecycle(dataLifecycle) + .dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true))) ) .metadata(metadata) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) @@ -261,6 +308,27 @@ private static void indexDoc(String dataStream) { indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); } + private static void indexFailedDoc(String dataStream) { + BulkRequest bulkRequest = new BulkRequest(); + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source( + String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value), + XContentType.JSON + ) + ); + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(1)); + String backingIndexPrefix = DataStream.FAILURE_STORE_PREFIX + dataStream; + for (BulkItemResponse itemResponse : bulkResponse) { + assertThat(itemResponse.getFailureMessage(), nullValue()); + assertThat(itemResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix)); + } + indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); + } + public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";