Skip to content

[ML] Allow stopping DF analytics whose config is missing #56360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[ML] Allow stopping DF analytics whose config is missing
It is possible that the config document for a data frame
analytics job is deleted from the config index. If that is
the case the user is unable to stop a running job because
we attempt to retrieve the config and that will throw.

This commit changes that. When the request is forced,
we do not expand the requested ids based on the existing
configs but from the list of running tasks instead.
  • Loading branch information
dimitris-athanasiou committed May 7, 2020
commit edcb4f37313c3b19b8088f15e7975e955f9e5b66
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
package org.elasticsearch.xpack.core.ml.utils;

import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -94,4 +101,33 @@ public static String getParentField(String fieldPath) {
}
return fieldPath.substring(0, lastIndexOfDot);
}

/**
* Given a collection of strings and some patterns, it finds the strings that match against at least one pattern.
* @param patterns the patterns may contain wildcards
* @param items the collections of strings
* @return the strings from {@code items} that match against at least one pattern
*/
public static Collection<String> findMatching(String[] patterns, Set<String> items) {
if (items.isEmpty()) {
return Collections.emptyList();
}
if (Strings.isAllOrWildcard(patterns)) {
return items;
}

List<String> matchingItems = new ArrayList<>();
for (String pattern : patterns) {
if (items.contains(pattern)) {
matchingItems.add(pattern);
} else if (Regex.isSimpleMatchPattern(pattern)) {
for (String item : items) {
if (Regex.simpleMatch(pattern, item)) {
matchingItems.add(item);
}
}
}
}
return matchingItems;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -92,11 +95,11 @@ protected void doExecute(Task task, StopDataFrameAnalyticsAction.Request request
logger.debug("Received request to stop data frame analytics [{}]", request.getId());

ActionListener<Set<String>> expandedIdsListener = ActionListener.wrap(
expandedIds -> {
logger.debug("Resolved data frame analytics to stop: {}", expandedIds);
idsToStop -> {
logger.debug("Resolved data frame analytics to stop: {}", idsToStop);

PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks);
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(idsToStop, tasks);

if (analyticsByTaskState.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
Expand All @@ -112,26 +115,51 @@ protected void doExecute(Task task, StopDataFrameAnalyticsAction.Request request
listener::onFailure
);

expandIds(state, request, expandedIdsListener);
findIdsToStop(state, request, expandedIdsListener);
}

private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
ActionListener<Set<String>> expandedIdsListener) {
ActionListener<List<DataFrameAnalyticsConfig>> configsListener = ActionListener.wrap(
configs -> {
Set<String> matchingIds = configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet());
PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Set<String> startedIds = tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream()
.filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX))
.map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, ""))
.collect(Collectors.toSet());
private void findIdsToStop(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
ActionListener<Set<String>> expandedIdsListener) {
Set<String> startedIds = getAllStartedIds(clusterState);

ActionListener<Set<String>> matchingIdsListener = ActionListener.wrap(
matchingIds -> {
startedIds.retainAll(matchingIds);
expandedIdsListener.onResponse(startedIds);
},
expandedIdsListener::onFailure
);

configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener);
if (request.isForce()) {
matchAllStartedIds(request, startedIds, matchingIdsListener);
} else {
configProvider.getMultiple(request.getId(), request.allowNoMatch(), ActionListener.wrap(
configs -> matchingIdsListener.onResponse(
configs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toSet())),
matchingIdsListener::onFailure
));
}
}

private Set<String> getAllStartedIds(ClusterState clusterState) {
PersistentTasksCustomMetadata tasksMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
return tasksMetadata == null ? Collections.emptySet() : tasksMetadata.tasks().stream()
.filter(t -> t.getId().startsWith(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX))
.map(t -> t.getId().replaceFirst(MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, ""))
.collect(Collectors.toSet());
}

private void matchAllStartedIds(StopDataFrameAnalyticsAction.Request request, Set<String> startedIds,
ActionListener<Set<String>> matchingIdsListener) {
String[] tokens = ExpandedIdsMatcher.tokenizeExpression(request.getId());
ExpandedIdsMatcher expandedIdsMatcher = new ExpandedIdsMatcher(tokens, request.allowNoMatch());
expandedIdsMatcher.filterMatchedIds(startedIds);
if (expandedIdsMatcher.hasUnmatchedIds()) {
matchingIdsListener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(expandedIdsMatcher.unmatchedIdsString()));
return;
}
Collection<String> matchingStartedIds = MlStrings.findMatching(tokens, startedIds);
matchingIdsListener.onResponse(new HashSet<>(matchingStartedIds));
}

private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -492,27 +492,7 @@ private QueryBuilder buildDatafeedIdQuery(String [] tokens) {
}

static Collection<String> matchingDatafeedIdsWithTasks(String[] datafeedIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
Set<String> startedDatafeedIds = MlTasks.startedDatafeedIds(tasksMetadata);
if (startedDatafeedIds.isEmpty()) {
return Collections.emptyList() ;
}
if (Strings.isAllOrWildcard(datafeedIdPatterns)) {
return startedDatafeedIds;
}

List<String> matchingDatafeedIds = new ArrayList<>();
for (String datafeedIdPattern : datafeedIdPatterns) {
if (startedDatafeedIds.contains(datafeedIdPattern)) {
matchingDatafeedIds.add(datafeedIdPattern);
} else if (Regex.isSimpleMatchPattern(datafeedIdPattern)) {
for (String startedDatafeedId : startedDatafeedIds) {
if (Regex.simpleMatch(datafeedIdPattern, startedDatafeedId)) {
matchingDatafeedIds.add(startedDatafeedId);
}
}
}
}
return matchingDatafeedIds;
return MlStrings.findMatching(datafeedIdPatterns, MlTasks.startedDatafeedIds(tasksMetadata));
}

private QueryBuilder buildDatafeedJobIdsQuery(Collection<String> jobIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -523,7 +525,7 @@ public void expandJobsIds(String expression,
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
Set<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata);
Collection<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
Expand Down Expand Up @@ -742,28 +744,8 @@ public void validateDatafeedJob(DatafeedConfig config, ActionListener<Boolean> l
));
}

static Set<String> matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
Set<String> openjobs = MlTasks.openJobIds(tasksMetadata);
if (openjobs.isEmpty()) {
return Collections.emptySet();
}
if (Strings.isAllOrWildcard(jobIdPatterns)) {
return openjobs;
}

Set<String> matchingJobIds = new HashSet<>();
for (String jobIdPattern : jobIdPatterns) {
if (openjobs.contains(jobIdPattern)) {
matchingJobIds.add(jobIdPattern);
} else if (Regex.isSimpleMatchPattern(jobIdPattern)) {
for (String openJobId : openjobs) {
if (Regex.simpleMatch(jobIdPattern, openJobId)) {
matchingJobIds.add(openJobId);
}
}
}
}
return matchingJobIds;
static Collection<String> matchingJobIdsWithTasks(String[] jobIdPatterns, PersistentTasksCustomMetadata tasksMetadata) {
return MlStrings.findMatching(jobIdPatterns, MlTasks.openJobIds(tasksMetadata));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class MlStringsTests extends ESTestCase {

public void testDoubleQuoteIfNotAlphaNumeric() {
assertEquals("foo2", MlStrings.doubleQuoteIfNotAlphaNumeric("foo2"));
assertEquals("\"fo o\"", MlStrings.doubleQuoteIfNotAlphaNumeric("fo o"));
Expand Down Expand Up @@ -46,4 +53,21 @@ public void testHasValidLengthForId() {
assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(64)), is(true));
assertThat(MlStrings.hasValidLengthForId(randomAlphaOfLength(65)), is(false));
}

public void testFindMatching_GivenEmptyItems() {
assertThat(MlStrings.findMatching(new String[0], Collections.emptySet()), is(empty()));
}

public void testFindMatching_GivenAllPattern() {
assertThat(MlStrings.findMatching(new String[] {"_all"}, new HashSet<>(Arrays.asList("a", "b"))), hasItems("a", "b"));
}

public void testFindMatching_GivenWildcardPattern() {
assertThat(MlStrings.findMatching(new String[] {"*"}, new HashSet<>(Arrays.asList("a", "b"))), hasItems("a", "b"));
}

public void testFindMatching_GivenMixedPatterns() {
assertThat(MlStrings.findMatching(new String[] {"concrete", "wild-*"}, new HashSet<>(
Arrays.asList("a", "concrete", "con*", "wild-1", "wild-2"))), hasItems("concrete", "wild-1", "wild-2"));
}
}