Skip to content

Commit 21bb836

Browse files
authored
Adding actions to get and update data stream mappings (#130042)
1 parent 2e8eb84 commit 21bb836

File tree

16 files changed

+1404
-5
lines changed

16 files changed

+1404
-5
lines changed

modules/data-streams/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ tasks.withType(StandaloneRestIntegTestTask).configureEach {
3030
usesDefaultDistribution("to be triaged")
3131
}
3232

33+
tasks.named("internalClusterTest").configure {
34+
systemProperty 'es.logs_stream_feature_flag_enabled', 'true'
35+
}
36+
3337
if (buildParams.inFipsJvm){
3438
// These fail in CI but only when run as part of checkPart2 and not individually.
3539
// Tracked in :
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
14+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
15+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
16+
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
17+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
18+
import org.elasticsearch.action.bulk.BulkRequest;
19+
import org.elasticsearch.action.bulk.BulkResponse;
20+
import org.elasticsearch.action.bulk.TransportBulkAction;
21+
import org.elasticsearch.action.datastreams.GetDataStreamAction;
22+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
23+
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
24+
import org.elasticsearch.action.index.IndexRequest;
25+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
26+
import org.elasticsearch.cluster.metadata.DataStream;
27+
import org.elasticsearch.cluster.metadata.MappingMetadata;
28+
import org.elasticsearch.cluster.metadata.Template;
29+
import org.elasticsearch.common.compress.CompressedXContent;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.xcontent.XContentHelper;
32+
import org.elasticsearch.core.TimeValue;
33+
import org.elasticsearch.plugins.Plugin;
34+
import org.elasticsearch.test.ESIntegTestCase;
35+
import org.elasticsearch.xcontent.XContentType;
36+
37+
import java.io.IOException;
38+
import java.util.Collection;
39+
import java.util.List;
40+
import java.util.Locale;
41+
import java.util.Map;
42+
43+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
44+
import static org.hamcrest.Matchers.equalTo;
45+
46+
public class TransportUpdateDataStreamMappingsActionIT extends ESIntegTestCase {
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
return List.of(DataStreamsPlugin.class);
51+
}
52+
53+
public void testGetAndUpdateMappings() throws IOException {
54+
String dataStreamName = "my-data-stream-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
55+
createDataStream(dataStreamName);
56+
57+
Map<String, Object> originalMappings = Map.of(
58+
"dynamic",
59+
"strict",
60+
"properties",
61+
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
62+
);
63+
Map<String, Object> mappingOverrides = Map.of(
64+
"properties",
65+
Map.of("foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text"))
66+
);
67+
Map<String, Object> expectedEffectiveMappings = Map.of(
68+
"dynamic",
69+
"strict",
70+
"properties",
71+
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text"))
72+
);
73+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
74+
updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, true);
75+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
76+
updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, false);
77+
assertExpectedMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings);
78+
79+
// Now make sure that the backing index still has the original mappings:
80+
Map<String, Object> originalIndexMappings = Map.of(
81+
"dynamic",
82+
"strict",
83+
"_data_stream_timestamp",
84+
Map.of("enabled", true),
85+
"properties",
86+
Map.of("@timestamp", Map.of("type", "date"), "foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
87+
);
88+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().getFirst().getName(), originalIndexMappings);
89+
90+
// Do a rollover, and then make sure that the updated mappnigs are on the new write index:
91+
assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
92+
Map<String, Object> updatedIndexMappings = Map.of(
93+
"dynamic",
94+
"strict",
95+
"_data_stream_timestamp",
96+
Map.of("enabled", true),
97+
"properties",
98+
Map.of(
99+
"@timestamp",
100+
Map.of("type", "date"),
101+
"foo1",
102+
Map.of("type", "text"),
103+
"foo2",
104+
Map.of("type", "keyword"),
105+
"foo3",
106+
Map.of("type", "text")
107+
)
108+
);
109+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(1).getName(), updatedIndexMappings);
110+
111+
// Now undo the mapping overrides, and expect the original mapping to be in effect:
112+
updateMappings(dataStreamName, Map.of(), originalMappings, false);
113+
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
114+
assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
115+
assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(2).getName(), originalIndexMappings);
116+
}
117+
118+
private void createDataStream(String dataStreamName) throws IOException {
119+
String mappingString = """
120+
{
121+
"_doc":{
122+
"dynamic":"strict",
123+
"properties":{
124+
"foo1":{
125+
"type":"text"
126+
},
127+
"foo2":{
128+
"type":"text"
129+
}
130+
}
131+
}
132+
}
133+
""";
134+
CompressedXContent mapping = CompressedXContent.fromJSON(mappingString);
135+
Template template = new Template(Settings.EMPTY, mapping, null);
136+
ComposableIndexTemplate.DataStreamTemplate dataStreamTemplate = new ComposableIndexTemplate.DataStreamTemplate();
137+
ComposableIndexTemplate composableIndexTemplate = ComposableIndexTemplate.builder()
138+
.indexPatterns(List.of("my-data-stream-*"))
139+
.dataStreamTemplate(dataStreamTemplate)
140+
.template(template)
141+
.build();
142+
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request("test");
143+
request.indexTemplate(composableIndexTemplate);
144+
145+
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
146+
147+
BulkRequest bulkRequest = new BulkRequest();
148+
bulkRequest.add(new IndexRequest(dataStreamName).source("""
149+
{
150+
"@timestamp": "2024-08-27",
151+
"foo1": "baz"
152+
}
153+
""", XContentType.JSON).id(randomUUID()));
154+
bulkRequest.add(new IndexRequest(dataStreamName).source("""
155+
{
156+
"@timestamp": "2024-08-27",
157+
"foo3": "baz"
158+
}
159+
""", XContentType.JSON).id(randomUUID()));
160+
BulkResponse response = client().execute(new ActionType<BulkResponse>(TransportBulkAction.NAME), bulkRequest).actionGet();
161+
assertThat(response.getItems().length, equalTo(2));
162+
}
163+
164+
private void assertExpectedMappings(
165+
String dataStreamName,
166+
Map<String, Object> expectedMappingOverrides,
167+
Map<String, Object> expectedEffectiveMappings
168+
) {
169+
GetDataStreamMappingsAction.Response getMappingsResponse = client().execute(
170+
new ActionType<GetDataStreamMappingsAction.Response>(GetDataStreamMappingsAction.NAME),
171+
new GetDataStreamMappingsAction.Request(TimeValue.THIRTY_SECONDS).indices(dataStreamName)
172+
).actionGet();
173+
List<GetDataStreamMappingsAction.DataStreamMappingsResponse> responses = getMappingsResponse.getDataStreamMappingsResponses();
174+
assertThat(responses.size(), equalTo(1));
175+
GetDataStreamMappingsAction.DataStreamMappingsResponse mappingsResponse = responses.getFirst();
176+
assertThat(mappingsResponse.dataStreamName(), equalTo(dataStreamName));
177+
assertThat(
178+
XContentHelper.convertToMap(mappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(),
179+
equalTo(expectedMappingOverrides)
180+
);
181+
assertThat(
182+
XContentHelper.convertToMap(mappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(),
183+
equalTo(expectedEffectiveMappings)
184+
);
185+
186+
DataStream dataStream = getDataStream(dataStreamName);
187+
assertThat(
188+
XContentHelper.convertToMap(dataStream.getMappings().uncompressed(), true, XContentType.JSON).v2(),
189+
equalTo(expectedMappingOverrides)
190+
);
191+
}
192+
193+
private void assertExpectedMappingsOnIndex(String indexName, Map<String, Object> expectedMappings) {
194+
GetMappingsResponse mappingsResponse = client().execute(
195+
new ActionType<GetMappingsResponse>(GetMappingsAction.NAME),
196+
new GetMappingsRequest(TimeValue.THIRTY_SECONDS).indices(indexName)
197+
).actionGet();
198+
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
199+
assertThat(mappings.size(), equalTo(1));
200+
assertThat(mappings.values().iterator().next().sourceAsMap(), equalTo(expectedMappings));
201+
}
202+
203+
private void updateMappings(
204+
String dataStreamName,
205+
Map<String, Object> mappingOverrides,
206+
Map<String, Object> expectedEffectiveMappings,
207+
boolean dryRun
208+
) throws IOException {
209+
CompressedXContent mappingOverride = new CompressedXContent(mappingOverrides);
210+
UpdateDataStreamMappingsAction.Response putMappingsResponse = client().execute(
211+
new ActionType<UpdateDataStreamMappingsAction.Response>(UpdateDataStreamMappingsAction.NAME),
212+
new UpdateDataStreamMappingsAction.Request(mappingOverride, dryRun, TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).indices(
213+
dataStreamName
214+
)
215+
).actionGet();
216+
assertThat(putMappingsResponse.getDataStreamMappingsResponses().size(), equalTo(1));
217+
UpdateDataStreamMappingsAction.DataStreamMappingsResponse firstPutMappingsResponse = putMappingsResponse
218+
.getDataStreamMappingsResponses()
219+
.getFirst();
220+
assertThat(firstPutMappingsResponse.dataStreamName(), equalTo(dataStreamName));
221+
assertThat(
222+
XContentHelper.convertToMap(firstPutMappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(),
223+
equalTo(mappingOverrides)
224+
);
225+
assertThat(
226+
XContentHelper.convertToMap(firstPutMappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(),
227+
equalTo(expectedEffectiveMappings)
228+
);
229+
}
230+
231+
private DataStream getDataStream(String dataStreamName) {
232+
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
233+
TEST_REQUEST_TIMEOUT,
234+
new String[] { dataStreamName }
235+
);
236+
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
237+
.actionGet();
238+
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
239+
return getDataStreamResponse.getDataStreams().get(0).getDataStream();
240+
}
241+
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
1515
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
1616
import org.elasticsearch.action.datastreams.GetDataStreamAction;
17+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
1718
import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction;
1819
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
1920
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2021
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
2122
import org.elasticsearch.action.datastreams.PutDataStreamOptionsAction;
23+
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
2224
import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction;
2325
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
2426
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2527
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
2628
import org.elasticsearch.client.internal.OriginSettingClient;
29+
import org.elasticsearch.cluster.metadata.DataStream;
2730
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2831
import org.elasticsearch.cluster.node.DiscoveryNodes;
2932
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -37,11 +40,13 @@
3740
import org.elasticsearch.datastreams.action.TransportCreateDataStreamAction;
3841
import org.elasticsearch.datastreams.action.TransportDataStreamsStatsAction;
3942
import org.elasticsearch.datastreams.action.TransportDeleteDataStreamAction;
43+
import org.elasticsearch.datastreams.action.TransportGetDataStreamMappingsAction;
4044
import org.elasticsearch.datastreams.action.TransportGetDataStreamSettingsAction;
4145
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
4246
import org.elasticsearch.datastreams.action.TransportMigrateToDataStreamAction;
4347
import org.elasticsearch.datastreams.action.TransportModifyDataStreamsAction;
4448
import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction;
49+
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction;
4550
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction;
4651
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
4752
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
@@ -246,6 +251,10 @@ public List<ActionHandler> getActions() {
246251
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
247252
actions.add(new ActionHandler(GetDataStreamSettingsAction.INSTANCE, TransportGetDataStreamSettingsAction.class));
248253
actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class));
254+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
255+
actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class));
256+
actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class));
257+
}
249258
return actions;
250259
}
251260

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.action;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
14+
import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction;
15+
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
18+
import org.elasticsearch.cluster.ProjectState;
19+
import org.elasticsearch.cluster.block.ClusterBlockException;
20+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
21+
import org.elasticsearch.cluster.metadata.DataStream;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.project.ProjectResolver;
24+
import org.elasticsearch.cluster.service.ClusterService;
25+
import org.elasticsearch.injection.guice.Inject;
26+
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.transport.TransportService;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
public class TransportGetDataStreamMappingsAction extends TransportLocalProjectMetadataAction<
35+
GetDataStreamMappingsAction.Request,
36+
GetDataStreamMappingsAction.Response> {
37+
private final IndexNameExpressionResolver indexNameExpressionResolver;
38+
39+
@Inject
40+
public TransportGetDataStreamMappingsAction(
41+
TransportService transportService,
42+
ClusterService clusterService,
43+
ThreadPool threadPool,
44+
ActionFilters actionFilters,
45+
ProjectResolver projectResolver,
46+
IndexNameExpressionResolver indexNameExpressionResolver
47+
) {
48+
super(
49+
GetSettingsAction.NAME,
50+
actionFilters,
51+
transportService.getTaskManager(),
52+
clusterService,
53+
threadPool.executor(ThreadPool.Names.MANAGEMENT),
54+
projectResolver
55+
);
56+
this.indexNameExpressionResolver = indexNameExpressionResolver;
57+
}
58+
59+
@Override
60+
protected ClusterBlockException checkBlock(GetDataStreamMappingsAction.Request request, ProjectState state) {
61+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
62+
}
63+
64+
@Override
65+
protected void localClusterStateOperation(
66+
Task task,
67+
GetDataStreamMappingsAction.Request request,
68+
ProjectState project,
69+
ActionListener<GetDataStreamMappingsAction.Response> listener
70+
) throws Exception {
71+
List<String> dataStreamNames = indexNameExpressionResolver.dataStreamNames(
72+
project.metadata(),
73+
IndicesOptions.DEFAULT,
74+
request.indices()
75+
);
76+
Map<String, DataStream> dataStreamMap = project.metadata().dataStreams();
77+
List<GetDataStreamMappingsAction.DataStreamMappingsResponse> responseList = new ArrayList<>(dataStreamNames.size());
78+
for (String dataStreamName : dataStreamNames) {
79+
DataStream dataStream = dataStreamMap.get(dataStreamName);
80+
responseList.add(
81+
new GetDataStreamMappingsAction.DataStreamMappingsResponse(
82+
dataStreamName,
83+
dataStream.getMappings(),
84+
dataStream.getEffectiveMappings(project.metadata())
85+
)
86+
);
87+
}
88+
listener.onResponse(new GetDataStreamMappingsAction.Response(responseList));
89+
}
90+
}

0 commit comments

Comments
 (0)