Skip to content

[9.0] Improve resiliency of UpdateTimeSeriesRangeService #126678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126637.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126637
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
area: TSDB
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {

// getWriteIndex() selects the latest added index:
Index head = dataStream.getWriteIndex();
IndexMetadata im = current.metadata().getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
try {
try {
IndexMetadata im = current.metadata().getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
Settings settings = Settings.builder()
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
.build();
Expand All @@ -131,17 +131,17 @@ ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {
mBuilder.updateSettings(settings, head.getName());
// Verify that all temporal ranges of each backing index is still valid:
dataStream.validate(mBuilder::get);
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@
*/
package org.elasticsearch.datastreams;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -22,15 +29,22 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
Expand All @@ -41,6 +55,22 @@

public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {

static MockAppender appender;
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);

@BeforeClass
public static void classInit() throws IllegalAccessException {
appender = new MockAppender("mock_appender");
appender.start();
Loggers.addAppender(testLogger1, appender);
}

@AfterClass
public static void classCleanup() {
Loggers.removeAppender(testLogger1, appender);
appender.stop();
}

private ThreadPool threadPool;
private UpdateTimeSeriesRangeService instance;

Expand Down Expand Up @@ -191,6 +221,68 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
assertThat(getEndTime(result, dataStreamName3, 0), equalTo(start));
}

public void testUpdateTimeSeriesTemporalOneBadDataStream() {
String dataStreamName1 = "logs-app1";
String dataStreamName2 = "logs-app2-broken";
String dataStreamName3 = "logs-app3";
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);

Instant start = now.minus(90, ChronoUnit.MINUTES);
Instant end = start.plus(30, ChronoUnit.MINUTES);
Metadata.Builder mbBuilder = new Metadata.Builder();
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
}

Settings settings = Settings.builder().put("index.mode", "logsdb").build();
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
mbBuilder.put(im, true);
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
var ds2Indices = new ArrayList<>(ds2.getIndices());
ds2Indices.add(im.getIndex());
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
copy.put(
dataStreamName2,
new DataStream(
ds2.getName(),
ds2Indices,
2,
ds2.getMetadata(),
ds2.isHidden(),
ds2.isReplicated(),
ds2.isSystem(),
ds2.isAllowCustomRouting(),
ds2.getIndexMode(),
ds2.getLifecycle(),
ds2.getDataStreamOptions(),
ds2.getFailureIndices(),
ds2.rolloverOnWrite(),
ds2.getAutoShardingEvent()
)
);
mbBuilder.dataStreams(copy, Map.of());

now = now.minus(45, ChronoUnit.MINUTES);
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(mbBuilder).build();
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
assertThat(result, not(sameInstance(before)));
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
assertThat(getEndTime(result, dataStreamName1, 0), equalTo(expectedEndTime));
assertThat(getEndTime(result, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
assertThat(getEndTime(result, dataStreamName3, 0), equalTo(expectedEndTime));

String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
assertThat(
message,
equalTo(
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
+ "backing index ["
+ im.getIndex().getName()
+ "]"
)
);
}

public void testUpdatePollInterval() {
instance.scheduleTask();
assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(5)));
Expand Down Expand Up @@ -220,4 +312,27 @@ static Instant getStartTime(ClusterState state, String dataStreamName, int index
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
}

static class MockAppender extends AbstractAppender {
public LogEvent lastEvent;

MockAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
}

@Override
public void append(LogEvent event) {
lastEvent = event.toImmutable();
}

Message lastMessage() {
return lastEvent.getMessage();
}

public LogEvent getLastEventAndReset() {
LogEvent toReturn = lastEvent;
lastEvent = null;
return toReturn;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public static void getClusterStateWithDataStream(
builder.put(dataStreamBuilder.build());
}

private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
Settings.Builder b = Settings.builder()
.put(settings)
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
Expand Down