-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Issue-105420: Fix bug causing incorrect error on force deleting already deleted model #107188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
413c789
1191993
9e57be6
076e2ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,6 +8,7 @@ | |||||
|
||||||
import org.apache.logging.log4j.LogManager; | ||||||
import org.apache.logging.log4j.Logger; | ||||||
import org.elasticsearch.ElasticsearchException; | ||||||
import org.elasticsearch.ElasticsearchStatusException; | ||||||
import org.elasticsearch.ResourceNotFoundException; | ||||||
import org.elasticsearch.action.ActionListener; | ||||||
|
@@ -37,9 +38,12 @@ | |||||
import org.elasticsearch.transport.TransportService; | ||||||
import org.elasticsearch.xpack.core.ClientHelper; | ||||||
import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAction; | ||||||
import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; | ||||||
import org.elasticsearch.xpack.core.ml.action.StopTrainedModelDeploymentAction; | ||||||
import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata; | ||||||
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; | ||||||
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignmentMetadata; | ||||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages; | ||||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; | ||||||
import org.elasticsearch.xpack.core.ml.utils.InferenceProcessorInfoExtractor; | ||||||
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; | ||||||
|
@@ -52,6 +56,9 @@ | |||||
import java.util.Objects; | ||||||
import java.util.Optional; | ||||||
import java.util.Set; | ||||||
import java.util.concurrent.CountDownLatch; | ||||||
import java.util.concurrent.TimeUnit; | ||||||
import java.util.concurrent.atomic.AtomicBoolean; | ||||||
|
||||||
import static org.elasticsearch.core.Strings.format; | ||||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; | ||||||
|
@@ -146,11 +153,20 @@ static List<String> getModelAliases(ClusterState clusterState, String modelId) { | |||||
return modelAliases; | ||||||
} | ||||||
|
||||||
private void deleteModel(DeleteTrainedModelAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) { | ||||||
protected void deleteModel( | ||||||
DeleteTrainedModelAction.Request request, | ||||||
ClusterState state, | ||||||
ActionListener<AcknowledgedResponse> listener | ||||||
) { | ||||||
String id = request.getId(); | ||||||
IngestMetadata currentIngestMetadata = state.metadata().getProject().custom(IngestMetadata.TYPE); | ||||||
Set<String> referencedModels = InferenceProcessorInfoExtractor.getModelIdsFromInferenceProcessors(currentIngestMetadata); | ||||||
|
||||||
if (modelExists(request.getId()) == false) { | ||||||
listener.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.INFERENCE_NOT_FOUND, request.getId()))); | ||||||
return; | ||||||
} | ||||||
|
||||||
if (request.isForce() == false && referencedModels.contains(id)) { | ||||||
listener.onFailure( | ||||||
new ElasticsearchStatusException( | ||||||
|
@@ -199,6 +215,39 @@ private void deleteModel(DeleteTrainedModelAction.Request request, ClusterState | |||||
} | ||||||
} | ||||||
|
||||||
protected boolean modelExists(String modelId) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can avoid the countdown latch and hence blocking the calling thread by using a listener. You don't have to timeout the call to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After you make the Here's an example if it being used: |
||||||
CountDownLatch latch = new CountDownLatch(1); | ||||||
final AtomicBoolean modelExists = new AtomicBoolean(false); | ||||||
|
||||||
Comment on lines
+218
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid using a latch and requiring a timeout, I think we could replace this function with an actionListener. What do you think? |
||||||
ActionListener<TrainedModelConfig> trainedModelListener = new ActionListener<>() { | ||||||
@Override | ||||||
public void onResponse(TrainedModelConfig config) { | ||||||
modelExists.set(true); | ||||||
latch.countDown(); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void onFailure(Exception e) { | ||||||
logger.error("Failed to retrieve model {}: {}", modelId, e.getMessage(), e); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't an error as we are checking the model's existence here. If the model doesn't exist then that is fine and it should be reported back to the caller rather than logged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
latch.countDown(); | ||||||
} | ||||||
}; | ||||||
|
||||||
trainedModelProvider.getTrainedModel(modelId, GetTrainedModelsAction.Includes.empty(), null, trainedModelListener); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't pass a parent task to this request, it wont be cancelable. I think it would be better to pass in the task from the masterOperation here. |
||||||
|
||||||
try { | ||||||
boolean latchReached = latch.await(5, TimeUnit.SECONDS); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want a 5 second timeout here which will throw an exception. If this timeout occurs, I don't think it will be any clearer what happened for the end user. |
||||||
|
||||||
if (latchReached == false) { | ||||||
throw new ElasticsearchException("Timeout while waiting for trained model to be retrieved"); | ||||||
} | ||||||
} catch (InterruptedException e) { | ||||||
throw new ElasticsearchException("Unexpected exception", e); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is not necessary if you take my suggestion but in Java it's best practice in to reset the interrupt flag with |
||||||
} | ||||||
|
||||||
return modelExists.get(); | ||||||
} | ||||||
|
||||||
private void forceStopDeployment(String modelId, ActionListener<StopTrainedModelDeploymentAction.Response> listener) { | ||||||
StopTrainedModelDeploymentAction.Request request = new StopTrainedModelDeploymentAction.Request(modelId); | ||||||
request.setForce(true); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! this looks very clean and readable.