Skip to content

Pass data stream names from GET /data_streams/ to the stats req when verbose: true #118011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/118011.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118011
summary: "Pass data stream names from `GET /data_streams/` to the stats req when `verbose: true`"
area: Data streams
type: bug
issues:
- 117993
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Before;
import org.junit.ClassRule;

public class DataStreamWithSecurityIT extends ESRestTestCase {
Expand All @@ -40,6 +42,32 @@ public class DataStreamWithSecurityIT extends ESRestTestCase {
.rolesFile(Resource.fromClasspath("roles.yml"))
.build();

@Before
public void setUp() throws Exception {
super.setUp();
Request putLimitedUser = new Request("POST", "/_security/user/limited_user");
putLimitedUser.setJsonEntity(Strings.format("""
{
"password" : "%s",
"roles" : [ "only_get" ]
}
""", PASSWORD));
assertOK(adminClient().performRequest(putLimitedUser));

Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/my-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["my-ds*"],
"data_stream": {}
}
""");
assertOK(adminClient().performRequest(putComposableIndexTemplateRequest));
assertOK(adminClient().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
Request createDocRequest = new Request("POST", "/" + DATA_STREAM_NAME + "/_doc");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-01-01\", \"message\": \"foo\" }");
assertOK(adminClient().performRequest(createDocRequest));
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
Expand All @@ -64,27 +92,23 @@ private Settings simpleUserRestClientSettings() {
}

public void testGetDataStreamWithoutPermission() throws Exception {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/my-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["my-ds*"],
"data_stream": {}
}
""");
assertOK(adminClient().performRequest(putComposableIndexTemplateRequest));
assertOK(adminClient().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
Request createDocRequest = new Request("POST", "/" + DATA_STREAM_NAME + "/_doc");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2022-01-01\", \"message\": \"foo\" }");
assertOK(adminClient().performRequest(createDocRequest));

// Both the verbose and non-verbose versions should work with the "simple" user
try (var simpleUserClient = buildClient(simpleUserRestClientSettings(), getClusterHosts().toArray(new HttpHost[0]))) {
Request getDs = new Request("GET", "/_data_stream");
Request getDs = new Request("GET", "/_data_stream/*?expand_wildcards=all");
assertOK(simpleUserClient.performRequest(getDs));

Request getDsVerbose = new Request("GET", "/_data_stream?verbose=true");
Request getDsVerbose = new Request("GET", "/_data_stream/*?expand_wildcards=all&verbose=true");
assertOK(simpleUserClient.performRequest(getDsVerbose));
}
}

public void testGetDataStreamWithSuperuser() throws Exception {
// Both the verbose and non-verbose versions should work with the "superuser" user
Request getDs = new Request("GET", "/_data_stream/*?expand_wildcards=all");
assertOK(client().performRequest(getDs));

Request getDsVerbose = new Request("GET", "/_data_stream/*?expand_wildcards=all&verbose=true");
assertOK(client().performRequest(getDsVerbose));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,16 @@ protected void masterOperation(
ClusterState state,
ActionListener<GetDataStreamAction.Response> listener
) throws Exception {
List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(
indexNameExpressionResolver,
state,
request.getNames(),
request.indicesOptions()
);
if (request.verbose()) {
DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request();
req.indices(request.indices());
req.indices(dataStreamNames.toArray(new String[0]));
req.indicesOptions(request.indicesOptions());
client.execute(DataStreamsStatsAction.INSTANCE, req, new ActionListener<>() {
@Override
public void onResponse(DataStreamsStatsAction.Response response) {
Expand All @@ -118,7 +125,7 @@ public void onResponse(DataStreamsStatsAction.Response response) {
innerOperation(
state,
request,
indexNameExpressionResolver,
dataStreamNames,
systemIndices,
clusterSettings,
globalRetentionSettings,
Expand All @@ -134,21 +141,21 @@ public void onFailure(Exception e) {
});
} else {
listener.onResponse(
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null)
innerOperation(state, request, dataStreamNames, systemIndices, clusterSettings, globalRetentionSettings, null)
);
}
}

static GetDataStreamAction.Response innerOperation(
ClusterState state,
GetDataStreamAction.Request request,
IndexNameExpressionResolver indexNameExpressionResolver,
List<String> dataStreamNames,
SystemIndices systemIndices,
ClusterSettings clusterSettings,
DataStreamGlobalRetentionSettings globalRetentionSettings,
@Nullable Map<String, Long> maxTimestamps
) {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<DataStream> dataStreams = getDataStreams(state, dataStreamNames);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
final String indexTemplate;
Expand Down Expand Up @@ -293,15 +300,10 @@ private static void collectIndexSettingsValues(
}
}

static List<DataStream> getDataStreams(
ClusterState clusterState,
IndexNameExpressionResolver iner,
GetDataStreamAction.Request request
) {
List<String> results = DataStreamsActionUtil.getDataStreamNames(iner, clusterState, request.getNames(), request.indicesOptions());
static List<DataStream> getDataStreams(ClusterState clusterState, List<String> dataStreamNames) {
Map<String, DataStream> dataStreams = clusterState.metadata().dataStreams();

return results.stream().map(dataStreams::get).sorted(Comparator.comparing(DataStream::getName)).toList();
return dataStreamNames.stream().map(dataStreams::get).sorted(Comparator.comparing(DataStream::getName)).toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.elasticsearch.datastreams.action;

import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -22,7 +23,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -31,13 +31,9 @@
import java.time.temporal.ChronoUnit;
import java.util.List;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.getClusterStateWithDataStreams;
import static org.elasticsearch.test.LambdaMatchers.transformedItemsMatch;
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

Expand All @@ -49,82 +45,6 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
ClusterSettings.createBuiltInClusterSettings()
);

public void testGetDataStream() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these tests to DataStreamsActionUtilTests since we are using that method now to get the names.

final String dataStreamName = "my-data-stream";
ClusterState cs = getClusterStateWithDataStreams(List.of(new Tuple<>(dataStreamName, 1)), List.of());
GetDataStreamAction.Request req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
List<DataStream> dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamName)));
}

public void testGetDataStreamsWithWildcards() {
final String[] dataStreamNames = { "my-data-stream", "another-data-stream" };
ClusterState cs = getClusterStateWithDataStreams(
List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)),
List.of()
);

GetDataStreamAction.Request req = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamNames[1].substring(0, 5) + "*" }
);
List<DataStream> dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[1])));

req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[1], dataStreamNames[0])));

req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, (String[]) null);
dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[1], dataStreamNames[0])));

req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "matches-none*" });
dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, empty());
}

public void testGetDataStreamsWithoutWildcards() {
final String[] dataStreamNames = { "my-data-stream", "another-data-stream" };
ClusterState cs = getClusterStateWithDataStreams(
List.of(new Tuple<>(dataStreamNames[0], 1), new Tuple<>(dataStreamNames[1], 1)),
List.of()
);

GetDataStreamAction.Request req = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamNames[0], dataStreamNames[1] }
);
List<DataStream> dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[1], dataStreamNames[0])));

req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamNames[1] });
dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[1])));

req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamNames[0] });
dataStreams = TransportGetDataStreamsAction.getDataStreams(cs, resolver, req);
assertThat(dataStreams, transformedItemsMatch(DataStream::getName, contains(dataStreamNames[0])));

GetDataStreamAction.Request req2 = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "foo" });
IndexNotFoundException e = expectThrows(
IndexNotFoundException.class,
() -> TransportGetDataStreamsAction.getDataStreams(cs, resolver, req2)
);
assertThat(e.getMessage(), containsString("no such index [foo]"));
}

public void testGetNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
GetDataStreamAction.Request req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
IndexNotFoundException e = expectThrows(
IndexNotFoundException.class,
() -> TransportGetDataStreamsAction.getDataStreams(cs, resolver, req)
);
assertThat(e.getMessage(), containsString("no such index [" + dataStreamName + "]"));
}

public void testGetTimeSeriesDataStream() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
String dataStream1 = "ds-1";
Expand Down Expand Up @@ -159,15 +79,7 @@ public void testGetTimeSeriesDataStream() {
}

var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
null
);
var response = getResponse(state, req);
assertThat(
response.getDataStreams(),
contains(
Expand All @@ -190,15 +102,7 @@ public void testGetTimeSeriesDataStream() {
mBuilder.remove(dataStream.getIndices().get(1).getName());
state = ClusterState.builder(state).metadata(mBuilder).build();
}
response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
null
);
response = getResponse(state, req);
assertThat(
response.getDataStreams(),
contains(
Expand Down Expand Up @@ -241,15 +145,7 @@ public void testGetTimeSeriesDataStreamWithOutOfOrderIndices() {
}

var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
null
);
var response = getResponse(state, req);
assertThat(
response.getDataStreams(),
contains(
Expand Down Expand Up @@ -285,15 +181,7 @@ public void testGetTimeSeriesMixedDataStream() {
}

var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
null
);
var response = getResponse(state, req);

var name1 = DataStream.getDefaultBackingIndexName("ds-1", 1, instant.toEpochMilli());
var name2 = DataStream.getDefaultBackingIndexName("ds-1", 2, instant.toEpochMilli());
Expand Down Expand Up @@ -331,15 +219,7 @@ public void testPassingGlobalRetention() {
}

var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
null
);
var response = getResponse(state, req);
assertThat(response.getGlobalRetention(), nullValue());
DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
TimeValue.timeValueDays(randomIntBetween(1, 5)),
Expand All @@ -356,15 +236,28 @@ public void testPassingGlobalRetention() {
.build()
)
);
response = TransportGetDataStreamsAction.innerOperation(
response = getResponse(state, req, withGlobalRetentionSettings);
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
}

private GetDataStreamAction.Response getResponse(ClusterState state, GetDataStreamAction.Request req) {
return getResponse(state, req, dataStreamGlobalRetentionSettings);
}

private GetDataStreamAction.Response getResponse(
ClusterState state,
GetDataStreamAction.Request req,
DataStreamGlobalRetentionSettings globalRetentionSettings
) {
List<String> dataStreamNames = DataStreamsActionUtil.getDataStreamNames(resolver, state, req.getNames(), req.indicesOptions());
return TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
dataStreamNames,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
withGlobalRetentionSettings,
globalRetentionSettings,
null
);
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
}
}
Loading