diff --git a/muted-tests.yml b/muted-tests.yml index 4662cb40985fe..f0a65d074d0a7 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -309,9 +309,6 @@ tests: - class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests method: testRecreateTemplateWhenDeleted issue: https://github.com/elastic/elasticsearch/issues/123232 -- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT - method: testDataStreamLifecycleDownsampleRollingRestart - issue: https://github.com/elastic/elasticsearch/issues/123769 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=ml/start_data_frame_analytics/Test start given dest index is not empty} issue: https://github.com/elastic/elasticsearch/issues/125909 diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 52296f1d896cc..becb6aaa7de19 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -273,6 +273,23 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta * completed exceptionally on the scheduler thread that belongs to {@code clusterService}. */ public static SubscribableListener addTemporaryStateListener(ClusterService clusterService, Predicate predicate) { + return addTemporaryStateListener(clusterService, predicate, ESTestCase.SAFE_AWAIT_TIMEOUT); + } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state + * that satisfies {@code predicate}, at which point it unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is + * already complete. If no matching cluster state is seen within the provided {@code timeout} then the listener is + * completed exceptionally on the scheduler thread that belongs to {@code clusterService}. + */ + public static SubscribableListener addTemporaryStateListener( + ClusterService clusterService, + Predicate predicate, + TimeValue timeout + ) { final var listener = new SubscribableListener(); final ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override @@ -296,7 +313,7 @@ public String toString() { if (predicate.test(clusterService.state())) { listener.onResponse(null); } else { - listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); + listener.addTimeout(timeout, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); } return listener; } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index e8fb278bb570a..470f13a2d8c2e 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -114,7 +114,7 @@ private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.Dow return true; } return false; - }); + }, timeout); safeAwait(listener, timeout); } }